Python Blocking Socket 및 Non-Blocking Socket 예제

|

Python Blocking Socket 및 Non-Blocking Socket 예제

Blocking Socket 예제

다음 코드는 httpbin.org/delay/3 서브도메인에 접속해서 3초 딜레이 이후 응답을 받는 코드입니다. s.recv 코드에서 3초동안 Blocking 되서 동작합니다.

import socket

s = socket.create_connection(('httpbin.org', 80))
s.sendall(b'GET /delay/3 HTTP/1.1\r\nHost: httpbin.org\r\n\r\n')

buf = s.recv(1024)
print(buf)

Non-Blocking Socket 예제

다음은 Non-Blocking으로 동작하기 때문에 중간에 while 문으로 응답이 완료될 때까지 대기합니다.

import select
import socket

s = socket.create_connection(('httpbin.org', 80))
s.setblocking(False)

s.sendall(b'GET /delay/3 HTTP/1.1\r\nHost: httpbin.org\r\n\r\n')

while True:
    ready_to_read, ready_to_write, in_error = select.select([s], [], [])
    if s in ready_to_read:
        break

buf = s.recv(1024)
print(buf)

select

여기에서 사용한 select는 여러 개의 이벤트 소스를 결합하여 쉽게 모니터링할 수 있는 기술이며, 오래된 기술입니다. 그래서 최상의 성능을 보여주지는 못하며 리눅스의 epoll이나 FreeBSD의 kqueue와 같이 운영체제마다 다른 대안과 최적화를 구현해서 사용하고 있습니다.

파이썬에서는 다음 포스팅에서 다룰 asyncio라는 추상화 계층을 이용해서 사용하는 것이 더 좋습니다.

Python Socket으로 HTTP Request 보내는 방법

|

Socket으로 HTTP Request 보내는 예제 코드

HTTP 1.1 부터는 Header 정보에 Host 정보를 넣어줘야 하며, 마지막 부분에 2개의 \r\n이 필요합니다.

다음 2가지 예제는 모두 같은 코드입니다.

import socket

s = socket.create_connection(('httpbin.org', 80))
s.sendall(b'GET / HTTP/1.1\r\nHost: httpbin.org\r\n\r\n')

buf = s.recv(1024)
print(buf)
import socket

s = socket.create_connection(('httpbin.org', 80))
s.sendall(b'''GET / HTTP/1.1
Host: httpbin.org

''')

buf = s.recv(1024)
print(buf)

Python futures 예제

|

Futures

파이썬 3.2에 도입된 concurrent.futures 모듈이며, 나중에 파이썬 2에도 적용되었습니다. 파이썬은 concurrent.futures.ThreadPoolExecutorconcurrent.futures.Process.PoolExecutor의 2종류의 Executor를 지원합니다. 각각 Thread 기반과 Process 기반입니다.

간단한 예제

import time
from concurrent import futures


def loop(name, step):
    __sum = 0
    for i in range(0, 10):
        __sum = __sum + step
        print(f'[{name}] is looping({i}) ...')
        time.sleep(0.1)

    return __sum


if __name__ == '__main__':
    print(f'calculating ...')
    with futures.ThreadPoolExecutor(max_workers=3) as executor:
        f1 = executor.submit(loop, 'apple', 1)
        f2 = executor.submit(loop, 'bread', 2)
        f3 = executor.submit(loop, 'carrot', 3)
        f4 = executor.submit(loop, 'david', 4)
        f5 = executor.submit(loop, 'egg', 5)

    print(f'')
    print(f'result #1 : {f1.result()}')
    print(f'result #2 : {f2.result()}')
    print(f'result #3 : {f3.result()}')
    print(f'result #4 : {f4.result()}')
    print(f'result #5 : {f5.result()}')
</pre>

Python에서 RabbitMq 사용하기 (5) - RabbitMq Utlity

|

RabbitMq Utlity

rabbitmq_util.py

from http.client import HTTPConnection
import json
from urllib.parse import quote
from base64 import b64encode


class RabbitMqUtil:
    __OVERVIEW_URL = "/api/overview"
    __VHOSTS_URL = "/api/vhosts"
    __USERS_URL = "/api/users"
    __EXCHANGES_URL = "/api/exchanges"
    __QUEUES_URL = "/api/queues"
    __BINDINGS_URL = "/api/bindings"

    def __init__(self, host="localhost", port=15672, user="guest", passwd="guest"):
        self.__host = host
        self.__port = port
        encoded_auth = b64encode(f"{user}:{passwd}".encode()).decode()
        self.__header = {"Authorization": f"Basic {encoded_auth}",
                         "content-type": "application/json"}

    def __request(self, method, url, body=None):
        connection = HTTPConnection(self.__host, self.__port, timeout=5)
        connection.request(method=method,
                           url=url,
                           headers=self.__header,
                           body=body)
        response = connection.getresponse()
        code = response.getcode()
        content = response.read()
        response.close()
        connection.close()

        return code, content

    def get_overview(self):
        return self.__request("GET", RabbitMqUtil.__OVERVIEW_URL)

    def get_users(self):
        result = []
        code, content = self.__request("GET", RabbitMqUtil.__USERS_URL)
        users = json.loads(content)
        for item in users:
            result.append(item["name"])

        return result

    def get_vhosts(self):
        result = []
        code, content = self.__request("GET", RabbitMqUtil.__VHOSTS_URL)
        vhosts = json.loads(content)
        for item in vhosts:
            result.append(item["name"])

        return result

    def create_vhosts(self, name):
        name = quote(name, "")
        code, content = self.__request("PUT", f"{RabbitMqUtil.__VHOSTS_URL}/{name}")

        return (code == 201) or (code == 204)

    def delete_vhosts(self, name):
        name = quote(name, "")
        code, content = self.__request("DELETE", f"{RabbitMqUtil.__VHOSTS_URL}/{name}")

        return (code == 201) or (code == 204)

    def get_exchanges(self, vhost):
        result = []
        vhost = quote(vhost, "")
        code, content = self.__request("GET", f"{RabbitMqUtil.__EXCHANGES_URL}/{vhost}")
        exchanges = json.loads(content)
        for item in exchanges:
            result.append(item["name"])

        return result

    def create_exchange(self, vhost, exchange,
                        exchange_type="fanout",
                        auto_delete=False,
                        durable=True,
                        internal=False,
                        arguments=None):
        vhost = quote(vhost, '')
        exchange = quote(exchange, '')
        base_body = {"type": exchange_type,
                     "auto_delete": auto_delete,
                     "durable": durable,
                     "internal": internal,
                     "arguments": arguments or list()}
        body = json.dumps(base_body)
        code, content = self.__request("PUT", f"{RabbitMqUtil.__EXCHANGES_URL}/{vhost}/{exchange}", body)

        return (code == 201) or (code == 204)

    def delete_exchange(self, vhost, exchange):
        vhost = quote(vhost, '')
        exchange = quote(exchange, '')
        code, content = self.__request("DELETE", f"{RabbitMqUtil.__EXCHANGES_URL}/{vhost}/{exchange}")

        return (code == 201) or (code == 204)

    def get_queues(self, vhost):
        result = []
        vhost = quote(vhost, "")
        code, content = self.__request("GET", f"{RabbitMqUtil.__QUEUES_URL}/{vhost}")
        exchanges = json.loads(content)
        for item in exchanges:
            result.append(item["name"])

        return result

    def create_queue(self, vhost, queue, **kwargs):
        vhost = quote(vhost, '')
        queue = quote(queue, '')
        body = json.dumps(kwargs)
        code, content = self.__request("PUT", f"{RabbitMqUtil.__QUEUES_URL}/{vhost}/{queue}", body)

        return (code == 201) or (code == 204)

    def delete_queue(self, vhost, queue):
        vhost = quote(vhost, '')
        queue = quote(queue, '')
        code, content = self.__request("DELETE", f"{RabbitMqUtil.__QUEUES_URL}/{vhost}/{queue}")

        return (code == 201) or (code == 204)

    def purge_queue(self, vhost, queue):
        vhost = quote(vhost, '')
        queue = quote(queue, '')
        code, content = self.__request("DELETE", f"{RabbitMqUtil.__QUEUES_URL}/{vhost}/{queue}/contents")

        return (code == 201) or (code == 204)

    def get_bindings(self, vhost):
        vhost = quote(vhost, '')
        code, content = self.__request("GET", f"{RabbitMqUtil.__BINDINGS_URL}/{vhost}")

        return content

    def get_queue_bindings(self, vhost, queue):
        vhost = quote(vhost, '')
        queue = quote(queue, '')
        code, content = self.__request("GET", f"{RabbitMqUtil.__QUEUES_URL}/{vhost}/{queue}/bindings")

        return content

    def create_binding(self, vhost, exchange, queue, routing_key="", args=None):
        vhost = quote(vhost, '')
        exchange = quote(exchange, '')
        queue = quote(queue, '')
        body = json.dumps({"routing_key": routing_key, "arguments": args or {}})
        code, content = self.__request("POST",
                                       f"{RabbitMqUtil.__BINDINGS_URL}/{vhost}/e/{exchange}/q/{queue}",
                                       body)

        return (code == 201) or (code == 204)

    def delete_binding(self, vhost, exchange, queue, routing_key):
        vhost = quote(vhost, '')
        exchange = quote(exchange, '')
        queue = quote(queue, '')
        body = ''
        code, content = self.__request("DELETE",
                                       f"{RabbitMqUtil.__BINDINGS_URL}/{vhost}/e/{exchange}/q/{queue}/{routing_key}",
                                       body)

        return (code == 201) or (code == 204)

main.py

테스트용 코드로 별도로 필요하진 않습니다.

from pyrabbit.rabbitmq_util import RabbitMqUtil

rabbit = RabbitMqUtil("localhost")

VHOST_NAME = "snowdeer_vhost"
EXCHANGE_NAME = "snowdeer_exchange"
QUEUE_NAME = "snowdeer_queue"


def main():
    print("<OVERVIEW>")
    overview = rabbit.get_overview()
    print(f"overview: {overview}")

    test_users()
    test_vhosts(VHOST_NAME)
    test_exchanges(VHOST_NAME, EXCHANGE_NAME)
    test_queues(VHOST_NAME, QUEUE_NAME)
    test_bindings(VHOST_NAME, EXCHANGE_NAME, QUEUE_NAME)
    # delete_resources(VHOST_NAME, EXCHANGE_NAME, QUEUE_NAME)


def test_users():
    print("\n<TEST USERS>")

    users = rabbit.get_users()
    print(f"users: {users}")


def test_vhosts(vhost_name):
    print("\n<TEST VHOSTS>")

    vhosts = rabbit.get_vhosts()
    print(f"vhosts: {vhosts}")

    ret = rabbit.create_vhosts(vhost_name)
    print(f"create vhost: {ret}")

    vhosts = rabbit.get_vhosts()
    print(f"vhosts: {vhosts}")


def test_exchanges(vhost, exchange):
    print("\n<TEST EXCHANGES>")

    exchanges = rabbit.get_exchanges(vhost)
    print(f"exchanges: {exchanges}")

    ret = rabbit.create_exchange(vhost, exchange)
    print(f"create exchanges: {ret}")

    exchanges = rabbit.get_exchanges(vhost)
    print(f"exchanges: {exchanges}")


def test_queues(vhost, queue):
    print("\n<TEST QUEUES>")

    queues = rabbit.get_queues(vhost)
    print(f"queues: {queues}")

    ret = rabbit.create_queue(vhost, queue)
    print(f"create queue: {ret}")

    queues = rabbit.get_queues(vhost)
    print(f"queues: {queues}")

    ret = rabbit.purge_queue(vhost, queue)
    print(f"purge queue: {ret}")


def test_bindings(vhost, exchange, queue):
    print("\n<TEST BINDINGS>")

    bindings = rabbit.get_bindings(vhost)
    print(f"queues: {bindings}")

    bindings = rabbit.get_queue_bindings(vhost, queue)
    print(f"queues: {bindings}")

    ret = rabbit.create_binding(vhost, exchange, queue, routing_key="hello")
    print(f"create binding: {ret}")

    bindings = rabbit.get_bindings(vhost)
    print(f"queues: {bindings}")

    bindings = rabbit.get_queue_bindings(vhost, queue)
    print(f"queues: {bindings}")


def delete_resources(vhost, exchange, queue):
    ret = rabbit.delete_queue(vhost, queue)
    print(f"delete queue: {ret}")

    queues = rabbit.get_queues(vhost)
    print(f"queues: {queues}")

    ret = rabbit.delete_exchange(vhost, exchange)
    print(f"delete exchanges: {ret}")

    exchanges = rabbit.get_exchanges(vhost)
    print(f"exchanges: {exchanges}")

    ret = rabbit.delete_vhosts(VHOST_NAME)
    print(f"delete vhost: {ret}")

    vhosts = rabbit.get_vhosts()
    print(f"vhosts: {vhosts}")


def test_apis():
    rabbit.create_vhosts("test_vhost")

    rabbit.create_exchange("test_vhost", "ex1")
    rabbit.create_exchange("test_vhost", "ex2")

    rabbit.create_queue("test_vhost", "q1")
    rabbit.create_queue("test_vhost", "q2")
    rabbit.create_queue("test_vhost", "q3")

    rabbit.create_binding("test_vhost", "ex1", "q1")
    rabbit.create_binding("test_vhost", "ex2", "q2", routing_key="hello")
    rabbit.create_binding("test_vhost", "ex2", "q3", routing_key="snowdeer_binding")


if __name__ == '__main__':
    main()
    # test_apis()

Python에서 RabbitMq 사용하기 (4) - RPC

|

RPC

지금까지의 예제는 단방향으로 메시지를 전송하고 끝나는 예제들이었습니다. 하지만, 함수처럼 결과를 기다려야 할 때는 RPC 패턴을 사용합니다.

개인적으로는 RabbitMQ의 RPC 보다는 Celery의 Backend Result가 더 좋은 것 같습니다. RabbitMQ나 Redis는 Message Queue로 사용하고, 여기에 Task Queue인 Celery를 같이 사용하는 편이 더 사용성이 좋은 것 같습니다.

암튼 RabbitMQ의 RPC를 살펴보면, RabbitMQ는 callback_queue를 이용해서 RPC 기능을 지원합니다. Client에서 Request 메시지를 보내면 Server에서는 Response 메시지를 Client에게 전송합니다. Client에서 Response 메시지를 받기 위해서는 Request 메시지를 전송할 때, 아래의 예제와 같이 callback queue의 주소를 같이 보내야 합니다.

result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

위 방법으로 RPC를 구현하면 Request를 보낼 때마다 callback_queue를 매번 생성하기 때문에 비효율적인 면이 있습니다. 따라서 Client에서는 하나의 callback_queue를 만들어서 다양한 메시지의 응답을 수신하는 것이 더 유리합니다. 그러기 위해서는 propertiescorrelation_id라는 속성을 이용하면 됩니다. correlation_id는 Request마다 고유의 값을 가집니다. 그래서 하나의 callback_queue를 사용하더라도 Response가 어떤 Request에 대한 것인지 알 수 있습니다.

image

RPC 과정

따라서 RPC를 구현하는 과정은 다음과 같습니다.

  • Client 시작할 때, exclusive=True인 랜덤 이름의 Queue를 하나 생성
  • RPC Request를 보낼 때, reply_to, correlation_id 2개의 항목의 값을 채워서 전송. reply_tocallback_queue의 이름, correlation_id는 각 Request 마다 고유한 값으로 설정
  • Request는 rpc_queue에 전송됨
  • 서버는 메시지 수신 후 작업을 수행함. 그리고 결과는 reply_to 필드 이름의 Queue로 전송
  • Client는 callback_queue에 메시지가 도착하는 것을 기다리며, correlation_id 값을 확인 후 메시지 수신

rpc_server.py

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

rpc_client.py

import pika
import uuid

class FibonacciRpcClient(object):

    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)