Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the latest code #17

Merged
merged 3 commits into from
Dec 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
Expand All @@ -32,6 +33,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* Abstract implementation of Directory: Invoker list returned from this Directory's list method have been filtered by Routers
Expand Down Expand Up @@ -62,7 +64,14 @@ public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
this.url = url;

if (url.getProtocol().equals(Constants.REGISTRY_PROTOCOL)) {
Map<String, String> queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
this.url = url.clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);
} else {
this.url = url;
}

this.consumerUrl = consumerUrl;
setRouters(routers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,48 +26,75 @@

/**
* LeastActiveLoadBalance
* <p>
* Filter the number of invokers with the least number of active calls and count the weights and quantities of these invokers.
* If there is only one invoker, use the invoker directly;
* if there are multiple invokers and the weights are not the same, then random according to the total weight;
* if there are multiple invokers and the same weight, then randomly called.
*/
public class LeastActiveLoadBalance extends AbstractLoadBalance {

public static final String NAME = "leastactive";

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int leastActive = -1; // The least active value of all invokers
int leastCount = 0; // The number of invokers having the same least active value (leastActive)
int[] leastIndexes = new int[length]; // The index of invokers having the same least active value (leastActive)
int totalWeight = 0; // The sum of with warmup weights
int firstWeight = 0; // Initial value, used for comparision
boolean sameWeight = true; // Every invoker has the same weight value?
// Number of invokers
int length = invokers.size();
// The least active value of all invokers
int leastActive = -1;
// The number of invokers having the same least active value (leastActive)
int leastCount = 0;
// The index of invokers having the same least active value (leastActive)
int[] leastIndexes = new int[length];
// The sum of the warmup weights of all the least active invokes
int totalWeight = 0;
// The weight of the first least active invoke
int firstWeight = 0;
// Every least active invoker has the same weight value?
boolean sameWeight = true;

// Filter out all the least active invokers
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number
// Get the active number of the invoke
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// Get the weight of the invoke configuration. The default value is 100.
int afterWarmup = getWeight(invoker, invocation);
if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
leastActive = active; // Record the current least active value
leastCount = 1; // Reset leastCount, count again based on current leastCount
leastIndexes[0] = i; // Reset
totalWeight = afterWarmup; // Reset
firstWeight = afterWarmup; // Record the weight the first invoker
sameWeight = true; // Reset, every invoker has the same weight value?
} else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
leastIndexes[leastCount++] = i; // Record index number of this invoker
totalWeight += afterWarmup; // Add this invoker's with warmup weight to totalWeight.
// If it is the first invoker or the active number of the invoker is less than the current least active number
if (leastActive == -1 || active < leastActive) {
// Reset the active number of the current invoker to the least active number
leastActive = active;
// Reset the number of least active invokers
leastCount = 1;
// Put the first least active invoker first in leastIndexs
leastIndexes[0] = i;
// Reset totalWeight
totalWeight = afterWarmup;
// Record the weight the first least active invoker
firstWeight = afterWarmup;
// Each invoke has the same weight (only one invoker here)
sameWeight = true;
// If current invoker's active value equals with leaseActive, then accumulating.
} else if (active == leastActive) {
// Record the index of the least active invoker in leastIndexs order
leastIndexes[leastCount++] = i;
// Accumulate the total weight of the least active invoker
totalWeight += afterWarmup;
// If every invoker has the same weight?
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// assert(leastCount > 0)
// Choose an invoker from all the least active invokers
if (leastCount == 1) {
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexes[0]);
}
if (!sameWeight && totalWeight > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on
// totalWeight.
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight) + 1;
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
Expand All @@ -81,4 +108,4 @@ protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.junit.Ignore;
import org.junit.Test;


Expand Down Expand Up @@ -87,6 +88,7 @@ public void run() {


@Test
@Ignore
public void testCustomExecutor() {
Executor mockedExecutor = mock(Executor.class);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
Expand Down