1👍
The issue is that the UserRateThrottle
will always be applied, so even if you get the tests working, subscribers will still be bound to the default UserRateTrottle
rate in production.
The solution is to have both types of throttling in one class:
REST_FRAMEWORK = {
'DEFAULT_PERMISSION_CLASSES': (
'rest_framework.permissions.IsAuthenticated',
),
'DEFAULT_RENDERER_CLASSES': (
'rest_framework.renderers.JSONRenderer',
),
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.TokenAuthentication',
],
'DEFAULT_THROTTLE_CLASSES': [
'api.throttling.SubscriptionDailyRateThrottle',
],
'DEFAULT_THROTTLE_RATES': {
'user': '10/second',
}
}
And then differentiate the rate and duration based on whether the user is logged in or not:
class SubscriptionDailyRateThrottle(UserRateThrottle):
# Define a custom scope name to be referenced by DRF in settings.py
scope = "subscription"
def __init__(self):
super().__init__()
def get_user_rate(self, user):
# Returns tuple (user plan quota, total seconds in current month)
if user.is_authenticated:
return (custom_subscriber_rate, custom_subscriber_duration)
else:
return (default_rate, default_duration)
def get_cache_key(self, request, view):
# Set cache_key scope and ident based on subscriber or anon.
if request.user and request.user.is_authenticated:
ident = request.user.pk
scope = self.scope
else:
ident = self.get_ident(request)
scope = 'anon'
return self.cache_format % {'scope': scope, 'ident': ident}
def custom_throttle_success(self):
"""
Inserts the current request's timestamp along with the key
into the cache.
"""
self.history.insert(0, self.now)
self.cache.set(self.key, self.history, self.duration)
return True
def allow_request(self, request, view):
"""
Override rest_framework.throttling.SimpleRateThrottle.allow_request
Check to see if the request should be throttled.
On success calls `throttle_success`.
On failure calls `throttle_failure`.
"""
# Get limit and duration for all requests...
limit, duration = self.get_user_rate(request.user)
self.duration = duration
self.num_requests = limit
self.key = self.get_cache_key(request, view)
if self.key is None:
return True
self.history = self.cache.get(self.key, [])
self.now = self.timer()
# Drop any requests from the history which have now passed the throttle duration
while self.history and self.history[-1] <= self.now - self.duration:
self.history.pop()
if len(self.history) >= self.num_requests:
return self.throttle_failure()
return self.custom_throttle_success()
👤tghw
Source:stackexchange.com