限流- Resilience4j-AtomicRateLimiter

/* * *Copyright 2016 Robert Winkler and Bohdan Storozhuk * *Licensed under the Apache License, Version 2.0 (the "License"); *you may not use this file except in compliance with the License. *You may obtain a copy of the License at * *http://www.apache.org/licenses/LICENSE-2.0 * *Unless required by applicable law or agreed to in writing, software *distributed under the License is distributed on an "AS IS" BASIS, *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *See the License for the specific language governing permissions and *limitations under the License. * * */ package io.github.resilience4j.ratelimiter.internal; import io.github.resilience4j.ratelimiter.RateLimiter; import io.github.resilience4j.ratelimiter.RateLimiterConfig; import io.github.resilience4j.ratelimiter.event.RateLimiterOnDrainedEvent; import io.github.resilience4j.ratelimiter.event.RateLimiterOnFailureEvent; import io.github.resilience4j.ratelimiter.event.RateLimiterOnSuccessEvent; import io.vavr.collection.HashMap; import io.vavr.collection.Map; import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.UnaryOperator; import static java.lang.Long.min; import static java.lang.System.nanoTime; import static java.lang.Thread.currentThread; import static java.util.concurrent.locks.LockSupport.parkNanos; /** * {@link AtomicRateLimiter} splits all nanoseconds from the start of epoch into cycles. * Each cycle has duration of {@link RateLimiterConfig#getLimitRefreshPeriod} in nanoseconds. * By contract on start of each cycle {@link AtomicRateLimiter} should * set {@link State#activePermissions} to {@link RateLimiterConfig#getLimitForPeriod}. For the {@link * AtomicRateLimiter} callers it is really looks so, but under the hood there is some optimisations * that will skip this refresh if {@link AtomicRateLimiter} is not used actively. * All {@link AtomicRateLimiter} updates are atomic and state is encapsulated in {@link * AtomicReference} to {@link AtomicRateLimiter.State} */ public class AtomicRateLimiter implements RateLimiter {private final long nanoTimeStart; private final String name; private final AtomicInteger waitingThreads; private final AtomicReference state; private final Map tags; private final RateLimiterEventProcessor eventProcessor; public AtomicRateLimiter(String name, RateLimiterConfig rateLimiterConfig) { this(name, rateLimiterConfig, HashMap.empty()); }public AtomicRateLimiter(String name, RateLimiterConfig rateLimiterConfig, Map tags) { this.name = name; this.tags = tags; this.nanoTimeStart = nanoTime(); waitingThreads = new AtomicInteger(0); state = new AtomicReference<>(new State( rateLimiterConfig, 0, rateLimiterConfig.getLimitForPeriod(), 0 )); eventProcessor = new RateLimiterEventProcessor(); }/** * {@inheritDoc} */ @Override public void changeTimeoutDuration(final Duration timeoutDuration) { RateLimiterConfig newConfig = RateLimiterConfig.from(state.get().config) .timeoutDuration(timeoutDuration) .build(); state.updateAndGet(currentState -> new State( newConfig, currentState.activeCycle, currentState.activePermissions, currentState.nanosToWait )); }/** * {@inheritDoc} */ @Override public void changeLimitForPeriod(final int limitForPeriod) { RateLimiterConfig newConfig = RateLimiterConfig.from(state.get().config) .limitForPeriod(limitForPeriod) .build(); state.updateAndGet(currentState -> new State( newConfig, currentState.activeCycle, currentState.activePermissions, currentState.nanosToWait )); }/** * Calculates time elapsed from the class loading. */ private long currentNanoTime() { return nanoTime() - nanoTimeStart; }long getNanoTimeStart() { return this.nanoTimeStart; }/** * {@inheritDoc} */ @Override public boolean acquirePermission(final int permits) { long timeoutInNanos = state.get().config.getTimeoutDuration().toNanos(); State modifiedState = updateStateWithBackOff(permits, timeoutInNanos); boolean result = waitForPermissionIfNecessary(timeoutInNanos, modifiedState.nanosToWait); publishRateLimiterAcquisitionEvent(result, permits); return result; }/** * {@inheritDoc} */ @Override public long reservePermission(final int permits) { long timeoutInNanos = state.get().config.getTimeoutDuration().toNanos(); State modifiedState = updateStateWithBackOff(permits, timeoutInNanos); boolean canAcquireImmediately = modifiedState.nanosToWait <= 0; if (canAcquireImmediately) { publishRateLimiterAcquisitionEvent(true, permits); return 0; }boolean canAcquireInTime = timeoutInNanos >= modifiedState.nanosToWait; if (canAcquireInTime) { publishRateLimiterAcquisitionEvent(true, permits); return modifiedState.nanosToWait; }publishRateLimiterAcquisitionEvent(false, permits); return -1; }@Override public void drainPermissions() { AtomicRateLimiter.State prev; AtomicRateLimiter.State next; do { prev = state.get(); next = calculateNextState(prev.activePermissions, 0, prev); } while (!compareAndSet(prev, next)); if (eventProcessor.hasConsumers()) { eventProcessor.consumeEvent(new RateLimiterOnDrainedEvent(name, Math.min(prev.activePermissions, 0))); } }/** * Atomically updates the current {@link State} with the results of applying the {@link * AtomicRateLimiter#calculateNextState}, returning the updated {@link State}. It differs from * {@link AtomicReference#updateAndGet(UnaryOperator)} by constant back off. It means that after * one try to {@link AtomicReference#compareAndSet(Object, Object)} this method will wait for a * while before try one more time. This technique was originally described in this *paper * and showed great results with {@link AtomicRateLimiter} in benchmark tests. * * @param timeoutInNanos a side-effect-free function * @return the updated value */ private State updateStateWithBackOff(final int permits, final long timeoutInNanos) { AtomicRateLimiter.State prev; AtomicRateLimiter.State next; do { prev = state.get(); next = calculateNextState(permits, timeoutInNanos, prev); } while (!compareAndSet(prev, next)); return next; }/** * Atomically sets the value to the given updated value if the current value {@code ==} the * expected value. It differs from {@link AtomicReference#updateAndGet(UnaryOperator)} by * constant back off. It means that after one try to {@link AtomicReference#compareAndSet(Object, * Object)} this method will wait for a while before try one more time. This technique was * originally described in this *paper * and showed great results with {@link AtomicRateLimiter} in benchmark tests. * * @param current the expected value * @param nextthe new value * @return {@code true} if successful. False return indicates that the actual value was not * equal to the expected value. */ private boolean compareAndSet(final State current, final State next) { if (state.compareAndSet(current, next)) { return true; } parkNanos(1); // back-off return false; }/** * A side-effect-free function that can calculate next {@link State} from current. It determines * time duration that you should wait for the given number of permits and reserves it for you, * if you'll be able to wait long enough. * * @param permitsnumber of permits * @param timeoutInNanos max time that caller can wait for permission in nanoseconds * @param activeStatecurrent state of {@link AtomicRateLimiter} * @return next {@link State} */ private State calculateNextState(final int permits, final long timeoutInNanos, final State activeState) { long cyclePeriodInNanos = activeState.config.getLimitRefreshPeriod().toNanos(); int permissionsPerCycle = activeState.config.getLimitForPeriod(); long currentNanos = currentNanoTime(); long currentCycle = currentNanos / cyclePeriodInNanos; long nextCycle = activeState.activeCycle; int nextPermissions = activeState.activePermissions; if (nextCycle != currentCycle) { long elapsedCycles = currentCycle - nextCycle; long accumulatedPermissions = elapsedCycles * permissionsPerCycle; nextCycle = currentCycle; nextPermissions = (int) min(nextPermissions + accumulatedPermissions, permissionsPerCycle); } long nextNanosToWait = nanosToWaitForPermission( permits, cyclePeriodInNanos, permissionsPerCycle, nextPermissions, currentNanos, currentCycle ); State nextState = reservePermissions(activeState.config, permits, timeoutInNanos, nextCycle, nextPermissions, nextNanosToWait); return nextState; }/** * Calculates time to wait for the required permits of permissions to get accumulated * * @param permitspermits of required permissions * @param cyclePeriodInNanoscurrent configuration values * @param permissionsPerCyclecurrent configuration values * @param availablePermissions currently available permissions, can be negative if some *permissions have been reserved * @param currentNanoscurrent time in nanoseconds * @param currentCyclecurrent {@link AtomicRateLimiter} cycle@return nanoseconds to *wait for the next permission */ private long nanosToWaitForPermission(final int permits, final long cyclePeriodInNanos, final int permissionsPerCycle, final int availablePermissions, final long currentNanos, final long currentCycle) { if (availablePermissions >= permits) { return 0L; } long nextCycleTimeInNanos = (currentCycle + 1) * cyclePeriodInNanos; long nanosToNextCycle = nextCycleTimeInNanos - currentNanos; int permissionsAtTheStartOfNextCycle = availablePermissions + permissionsPerCycle; int fullCyclesToWait = divCeil(-(permissionsAtTheStartOfNextCycle - permits), permissionsPerCycle); return (fullCyclesToWait * cyclePeriodInNanos) + nanosToNextCycle; }/** * Divide two integers and round result to the bigger near mathematical integer. * * @param x - should be > 0 * @param y - should be > 0 */ private static int divCeil(int x, int y) { return (x + y - 1) / y; }/** * Determines whether caller can acquire permission before timeout or not and then creates * corresponding {@link State}. Reserves permissions only if caller can successfully wait for * permission. * * @param config * @param permitspermits of permissions * @param timeoutInNanos max time that caller can wait for permission in nanoseconds * @param cyclecycle for new {@link State} * @param permissionspermissions for new {@link State} * @param nanosToWaitnanoseconds to wait for the next permission * @return new {@link State} with possibly reserved permissions and time to wait */ private State reservePermissions(final RateLimiterConfig config, final int permits, final long timeoutInNanos, final long cycle, final int permissions, final long nanosToWait) { boolean canAcquireInTime = timeoutInNanos >= nanosToWait; int permissionsWithReservation = permissions; if (canAcquireInTime) { permissionsWithReservation -= permits; } return new State(config, cycle, permissionsWithReservation, nanosToWait); }/** * If nanosToWait is bigger than 0 it tries to park {@link Thread} for nanosToWait but not * longer then timeoutInNanos. * * @param timeoutInNanos max time that caller can wait * @param nanosToWaitnanoseconds caller need to wait * @return true if caller was able to wait for nanosToWait without {@link Thread#interrupt} and * not exceed timeout */ private boolean waitForPermissionIfNecessary(final long timeoutInNanos, final long nanosToWait) { boolean canAcquireImmediately = nanosToWait <= 0; boolean canAcquireInTime = timeoutInNanos >= nanosToWait; if (canAcquireImmediately) { return true; } if (canAcquireInTime) { return waitForPermission(nanosToWait); } waitForPermission(timeoutInNanos); return false; }/** * Parks {@link Thread} for nanosToWait. * If the current thread is {@linkplain Thread#interrupted} * while waiting for a permit then it won't throw {@linkplain InterruptedException}, but its * interrupt status will be set. * * @param nanosToWait nanoseconds caller need to wait * @return true if caller was not {@link Thread#interrupted} while waiting */ private boolean waitForPermission(final long nanosToWait) { waitingThreads.incrementAndGet(); long deadline = currentNanoTime() + nanosToWait; boolean wasInterrupted = false; while (currentNanoTime() < deadline && !wasInterrupted) { long sleepBlockDuration = deadline - currentNanoTime(); parkNanos(sleepBlockDuration); wasInterrupted = Thread.interrupted(); } waitingThreads.decrementAndGet(); if (wasInterrupted) { currentThread().interrupt(); } return !wasInterrupted; }/** * {@inheritDoc} */ @Override public String getName() { return name; }/** * {@inheritDoc} */ @Override public RateLimiterConfig getRateLimiterConfig() { return state.get().config; }/** * {@inheritDoc} */ @Override public Map getTags() { return tags; }/** * {@inheritDoc} */ @Override public Metrics getMetrics() { return new AtomicRateLimiterMetrics(); }@Override public EventPublisher getEventPublisher() { return eventProcessor; }@Override public String toString() { return "AtomicRateLimiter{" + "name='" + name + '\'' + ", rateLimiterConfig=" + state.get().config + '}'; }/** * Get the enhanced Metrics with some implementation specific details. * * @return the detailed metrics */ public AtomicRateLimiterMetrics getDetailedMetrics() { return new AtomicRateLimiterMetrics(); }private void publishRateLimiterAcquisitionEvent(boolean permissionAcquired, int permits) { if (!eventProcessor.hasConsumers()) { return; } if (permissionAcquired) { eventProcessor.consumeEvent(new RateLimiterOnSuccessEvent(name, permits)); return; } eventProcessor.consumeEvent(new RateLimiterOnFailureEvent(name, permits)); }/** * {@link AtomicRateLimiter.State} represents immutable state of {@link AtomicRateLimiter} * where: *
    *
  • activeCycle - {@link AtomicRateLimiter} cycle number that was used * by the last {@link AtomicRateLimiter#acquirePermission()} call.
  • * *
  • activePermissions - count of available permissions after * the last {@link AtomicRateLimiter#acquirePermission()} call. * Can be negative if some permissions where reserved.
  • * 【限流- Resilience4j-AtomicRateLimiter】*
  • nanosToWait - count of nanoseconds to wait for permission for * the last {@link AtomicRateLimiter#acquirePermission()} call.
  • *
*/ private static class State {private final RateLimiterConfig config; private final long activeCycle; private final int activePermissions; private final long nanosToWait; private State(RateLimiterConfig config, final long activeCycle, final int activePermissions, final long nanosToWait) { this.config = config; this.activeCycle = activeCycle; this.activePermissions = activePermissions; this.nanosToWait = nanosToWait; }}/** * Enhanced {@link Metrics} with some implementation specific details */ public class AtomicRateLimiterMetrics implements Metrics {private AtomicRateLimiterMetrics() { }/** * {@inheritDoc} */ @Override public int getNumberOfWaitingThreads() { return waitingThreads.get(); }/** * {@inheritDoc} */ @Override public int getAvailablePermissions() { State currentState = state.get(); State estimatedState = calculateNextState(1, -1, currentState); return estimatedState.activePermissions; }/** * @return estimated time duration in nanos to wait for the next permission */ public long getNanosToWait() { State currentState = state.get(); State estimatedState = calculateNextState(1, -1, currentState); return estimatedState.nanosToWait; }/** * @return estimated current cycle */ public long getCycle() { State currentState = state.get(); State estimatedState = calculateNextState(1, -1, currentState); return estimatedState.activeCycle; }} }

    推荐阅读