Python Redis RPC 예제

|

Pythong Redis RPC

redis_rpc.py

import json
import uuid
import redis
import logging

logger = logging.getLogger('redis_rpc')


class TimeoutException(Exception):
    pass


class Server(object):
    def __init__(self, name, host='localhost', port=6379):
        self.__redis = redis.Redis(host=host, port=port)
        self.__prefix = 'pyredisrpc:'
        self.__queue = self.__prefix + name
        self.__response_expire_time = 60
        self.__methods = {}
        self.__is_running = False

    def start(self):
        logging.debug('RedisRpcServer::start()')
        self.__redis.flushall()

        self.__is_running = True
        while self.__is_running:
            _, req_data = self.__redis.blpop(self.__queue)
            req_data = req_data.decode()
            req_args = self.__parse_request(req_data)
            if req_args is None:
                continue

            req_id, method, params, timeout_check = req_args
            if timeout_check and self.__is_timeout_expired(req_id):
                continue

            self.__call_method(req_id, method, params)

    def shutdown(self):
        logging.debug('RedisRpcServer::shutdown()')
        self.__is_running = False

    def __parse_request(self, req_data):
        try:
            req = json.loads(req_data)
        except json.JSONDecodeError:
            logger.warning(f'Exception - invalid request(JSON Dcode error)\n{req_data}')
            return

        try:
            req_id = req['id']
            method = req['method']
            params = req['params']
        except KeyError as e:
            key = e.args[0]
            self.__send_response(req_id, None, f'Exception - invalid request key({key})')
            return

        if method not in self.__methods:
            self.__send_response(req_id, None, f'Exception - invalid method({method})')
            return

        if type(params) != list or len(params) != 2 or type(params[0]) != list or type(params[1]) != dict:
            self.__send_response(req_id, None, f'Exception - invalid params({params})')
            return

        timeout_check = req.get('timeout') == 1

        return req_id, method, params, timeout_check

    def __is_timeout_expired(self, req_id):
        timeout_key = self.__prefix + req_id + ':timeout'
        return self.__redis.get(timeout_key) is None

    def __call_method(self, req_id, method, params):
        func = self.__methods[method]
        params_args = params[0]
        params_kw = params[1]

        try:
            result = func(*params_args, **params_kw)
            logger.info(f'{method}() is requested. params: {params}, result: {result}')
            self.__send_response(req_id, result)
        except Exception as e:
            self.__send_response(req_id, None, f'Exception - {repr(e)}')

    def __send_response(self, req_id, result, error_message=None):
        if error_message is not None:
            logger.warning(f'Exception - {error_message}')

        result = {'id': req_id, 'result': result, 'error': error_message}
        
        key = self.__prefix + req_id
        self.__redis.rpush(key, json.dumps(result))
        self.__redis.expire(key, self.__response_expire_time)

    def method(self, f):
        self.__methods[f.__name__] = f


class Client(object):
    def __init__(self, name, host='localhost', port=6379, timeout=0):
        self.__redis = redis.Redis(host=host, port=port)
        self.__prefix = 'pyredisrpc:'
        self.__queue = self.__prefix + name
        self.__timeout = timeout

    def __call(self, method, params):
        req_id = uuid.uuid4().hex
        req = {'id': req_id, 'method': method, 'params': params}

        if self.__timeout != 0:
            req['timeout'] = 1
            timeout_key = self.__prefix + req_id + ':timeout'
            self.__redis.set(timeout_key, 1, self.__timeout)

        self.__redis.rpush(self.__queue, json.dumps(req))
        key = self.__prefix + req_id
        res = self.__redis.blpop(key, self.__timeout)

        if not res:
            raise TimeoutException(req, key)

        _, response_data = res
        response = json.loads(response_data.decode())

        if response['error'] is not None:
            raise Exception(response['error'])

        return response['result']

    def __getattr__(self, method):
        def wrap(*args, **kw):
            return self.__call(method, [args, kw])

        return wrap

server.py

import logging

import redis_rpc as pyredisrpc
from colored_log_handler import ColoredLogHandler

logging.basicConfig(level="DEBUG", handlers=[ColoredLogHandler()])
logger = logging.getLogger('main')

server = pyredisrpc.Server('snowdeer-redis-rpc-example')


@server.method
def add(a, b):
    return a + b


try:
    server.start()
except KeyboardInterrupt:
    print("Interrupted")

server.shutdown()

client.py

import logging
import time
import random

import redis_rpc as pyredisrpc
from colored_log_handler import ColoredLogHandler

logging.basicConfig(level="DEBUG", handlers=[ColoredLogHandler()])

client = pyredisrpc.Client('snowdeer-redis-rpc-example', timeout=1)

for i in range(0, 200):
    x = random.randint(1, 10)
    y = random.randint(1, 10)

    ret = client.add(x, y)

    if ret == (x + y):
        logging.info(f'# test({i}) - ({x}, {y} --> {ret})')
    else:
        logging.warning(f'# test({i}) - ({x}, {y} --> {ret})')

    time.sleep(0.5)