ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Django] Redis로 API rate limit 해결하기
    개발일지/개발일지 2021. 7. 4. 22:14

    백엔드 개발을 하다보면 외부 서비스가 제공하는 API를 사용하는 경우가 많습니다. 예를 들어, 다른 서비스의 유저 정보를 조회를 하거나, 다른 어플리케이션의 status를 확인하는 경우 등 입니다. 그리고 API request의 시간을 줄이기 위해 비동기적으로 구현하여 동시에 많은량의 API request 신호를 보낼수도 있습니다.

    만약 외부 API에 "20 requests per second" 같은 rate limit이 존재한다면 구현이 조금 복잡해집니다. API request의 속도를 높이기 위해 비동기, 멀티쓰레딩 등 방법을 사용하여 동시에 여러 API requests를 발생시키도록 하였는데, 만약 우리의 어플리케이션을 동시에 여러 유저들이 사용하면 주어진 rate limit을 쉽게 초과해버리기 때문입니다. 그래서 이번 포스팅에서는 Redis로 간단한 메시지큐를 구현하여 rate limit 문제를 해결하는 방법을 소개해드리고자 합니다.

     

     

    Redis는 메인메모리 상에서 동작하는 key-value구조의 nosql 입니다. 백엔드에서 주로 캐시로도 많이 사용됩니다. 이번에는 Redis가 지원하는 collections(자료구조) 중 List를 이용하여 간단한 메시지 큐를 구현해보도록 하겠습니다.

     

    위 이미지는 Riot Games에서 유저 정보를 가져오기 위한 장고의 동작을 간단히 나타낸것입니다. 만약 우리의 어플리케이션의 이용하는 유저가 적으면 상관없지만 아래 이미지처럼 동시에 많은 접속이 발생하면 Limit을 초과해 Riot에서 제공하는 API가 제대로된 데이터를 주지 않을것입니다. 

     

    그런데 위처럼 동시에 접속하는 모든 유저의 request를 Redis로 구현한 메시지 큐에 적재를 하고 worker가 메시지 큐에 쌓인 task를 rate limit을 넘지않는 속도로 처리하면 항상 최상의 속도로 Riot Games의 API를 이용할 수 있게 됩니다.

     

     

     

     

    1. Redis 설치

    $ wget http://download.redis.io/redis-stable.tar.gz
    $ tar xvzf redis-stable.tar.gz
    $ cd redis-stable
    $ make
    $ redis-server # Redis 실행
    $ redis-cli ping # 핑 확인 pong이 뜨면 이상 무
    $ pip install django-redis
    $ pip install redis

     

     

    2. 프로젝트 디렉토리에서 Redis class 작성

    #Redis.py
    import redis
    import datetime
    
    REDIS_HOST = "localhost"
    REDIS_PORT = 6379
    REDIS_DBNUM = 0
    
    class RedisQueue(object):
        
        def __init__(self, name):
            """
                host='localhost', port=6379, db=0
            """
            self.key = name
            self.rq = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DBNUM)
    
        def size(self): # 큐 크기 확인
            return self.rq.llen(self.key)
    
        def isEmpty(self): # 비어있는 큐인지 확인
            return self.size() == 0
    
        def push(self, element): # 데이터 넣기
            self.rq.rpush(self.key, element)
        
        def getlist(self, num):
            ans = []
            for _ in range(num):
                x = self.rq.lpop(self.key)
                if(x==None):
                    break
                ans.append(x.decode())
            return ans
    
        def setval(self, key, value):
            self.rq.set(key, str(value), datetime.timedelta(seconds=100))
    
        def exist_by_key(self, key):
            return self.rq.exists(key)
    
        def get_by_key(self, key):
            return self.rq.get(key)
        
        def del_by_key(self, key):
            self.rq.delete(key)

    Redis는 로컬환경에서 실행할 것이므로  HOST는 localhost를 사용하고 기본으로 6379번 포트를 사용합니다. 그리고 몇몇 필요한 메소드를 작성하였는데 추가로 필요한 기능이 있으면 메소드를 직접 추가하여 작성하시면 됩니다.

     

     

     

    3. 간단한 예시

    #views.py
    from "Redis위치" import RedisQueue
    
    
    def matchinfo(request):
        if request.method == 'GET':
        
     	.......
        
    
            rq = RedisQueue("matches")
            for match_id in matchlist:
                rq.push(match_id)
    
            matches_result=[]
            for match_id in matchlist:
                while rq.exist_by_key(match_id)==0:
                    time.sleep(0.2)
    
                result = rq.get_by_key(match_id).decode().replace("'", "\"")
                if result == "None":
                    pass
                else:
                    matches_result.append(json.loads(result))
                rq.del_by_key(match_id)
    
    
    	........

    RedisQueue("사용할 key") 생성자로 사용 할 Redis 객체 rq를 생성해 주고, rq.push를 통해 메시지 큐에 task를 쌓아줬습니다. 그러고 저는 worker에서의 output을 redis에 다시 저장하는 방식을 사용하였기에 output이 나올때까지

    while(output exist?) 문에서 기다리는 busy waiting 방법으로 대기를 시킨 후, rq로 부터 output을 받아와 바로 리스트에 저장을 해줬습니다. 마지막으로 메모리 누수 방지를 위해 사용 한 key값은 삭제하도록 하였습니다.

    저는 간단한 프로젝트를 하였으므로 위 코드 정도로 끝났지만 조금 더 정밀한 서비스를 위해서는 redis로 부터 output이 오지 않은 경우 등 예외처리를 해줘야 합니다. 

     

     

    4. worker

    worker 같은 경우 간단히 crontab을 사용하거나 django-celery를 이용하시면 됩니다. 사용할 방법이 다양해 이 포스팅에서는 자세히는 다루지는 않겠지만, 저 같은 경우는 django-celery와 아래처럼 스케쥴러 함수를 작성하여 매초마다 메시지큐에 task가 존재하는지 검사를 하고, 만약 task가 존재한다면 비동기로 제한된 개수(NUMS_BY_ONETIME)만큼 task를 처리하도록 하였습니다. 이 같은 방법으로 외부 자원의 rate limit을 초과하지는 않지만 항상 최상의 속도를 유지할 수 있게됩니다.

    NUMS_BY_ONETIME = 10
    
    @shared_task
    def api_scheduler():
        rq = RedisQueue("matches")
    
        if rq.isEmpty():
            return None
    
        matchidlist = rq.getlist(NUMS_BY_ONETIME)
    
        asyncio.run(matchlist_async(matchidlist, rq))
        return(len(matchidlist))

     

     

     

    Redis를 메시지큐로 활용하는 간단한 방법에 대해 포스팅하였습니다. 만약 더 좋은 방법이나 라이브러리가 있다면, 댓글로 달아주시면 정말 감사하겠습니다.

    댓글

Designed by Tistory.