Python에서 RabbitMq 사용하기 (4) - RPC

|

RPC

지금까지의 예제는 단방향으로 메시지를 전송하고 끝나는 예제들이었습니다. 하지만, 함수처럼 결과를 기다려야 할 때는 RPC 패턴을 사용합니다.

개인적으로는 RabbitMQ의 RPC 보다는 Celery의 Backend Result가 더 좋은 것 같습니다. RabbitMQ나 Redis는 Message Queue로 사용하고, 여기에 Task Queue인 Celery를 같이 사용하는 편이 더 사용성이 좋은 것 같습니다.

암튼 RabbitMQ의 RPC를 살펴보면, RabbitMQ는 callback_queue를 이용해서 RPC 기능을 지원합니다. Client에서 Request 메시지를 보내면 Server에서는 Response 메시지를 Client에게 전송합니다. Client에서 Response 메시지를 받기 위해서는 Request 메시지를 전송할 때, 아래의 예제와 같이 callback queue의 주소를 같이 보내야 합니다.

result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

위 방법으로 RPC를 구현하면 Request를 보낼 때마다 callback_queue를 매번 생성하기 때문에 비효율적인 면이 있습니다. 따라서 Client에서는 하나의 callback_queue를 만들어서 다양한 메시지의 응답을 수신하는 것이 더 유리합니다. 그러기 위해서는 propertiescorrelation_id라는 속성을 이용하면 됩니다. correlation_id는 Request마다 고유의 값을 가집니다. 그래서 하나의 callback_queue를 사용하더라도 Response가 어떤 Request에 대한 것인지 알 수 있습니다.

image

RPC 과정

따라서 RPC를 구현하는 과정은 다음과 같습니다.

  • Client 시작할 때, exclusive=True인 랜덤 이름의 Queue를 하나 생성
  • RPC Request를 보낼 때, reply_to, correlation_id 2개의 항목의 값을 채워서 전송. reply_tocallback_queue의 이름, correlation_id는 각 Request 마다 고유한 값으로 설정
  • Request는 rpc_queue에 전송됨
  • 서버는 메시지 수신 후 작업을 수행함. 그리고 결과는 reply_to 필드 이름의 Queue로 전송
  • Client는 callback_queue에 메시지가 도착하는 것을 기다리며, correlation_id 값을 확인 후 메시지 수신

rpc_server.py

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

rpc_client.py

import pika
import uuid

class FibonacciRpcClient(object):

    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

Python에서 RabbitMq 사용하기 (3) - Routing

|

Routing

Routing 기능은 제 기준에서 그리 많이 사용되진 않을 거 같아서 간단히만 공부합니다. 더 자세한 내용은 여기에서 확인할 수 있습니다.

이전 포스팅에서 설명했던 Publish/Subscribe 기능은 단순히 메시지를 모든 Subscriber에게 전송하는 기능입니다. 여기에 약간의 옵션을 추가해서, 특정 Subscriber는 특별한 메시지만 받도록 할 수 있습니다.

예를 들어, 앞선 예제의 Logging 시스템에서 Disk Logging 프로그램, Screen Logging 프로그램이 있을 때 Screen Logging 프로그램은 모든 메시지를 수신하고, Disk Logging 프로그램은 Disk의 용량 절약을 목적으로 Critical Error Message만 수신하고 싶은 경우에 Routing 기능을 사용할 수 있습니다.

이 때, routing_key 옵션을 이용해서 바인딩 키를 설정할 수 있습니다.

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

바인딩 키는 Exchange의 타입에 영향을 받습니다. 타입이 fanout인 경우는 바인딩 키가 무시됩니다.

따라서, 아래 예제는 Exchange를 생성할 때, 타입을 direct로 설정했습니다.

image

위 그림에서 각 메시지는 바인딩 키(routing key)가 등록된 Queueㄹ 전송이 됩니다. 물론, 아래 그림과 같이 여러 개의 Queue에 동일한 바인딩 키를 바인딩해도 상관없습니다.

image

예제 코드

다음은 Exchangedirect 타입으로 생성하는 코드입니다.

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')                 

그리고 Publish 할 때 아래와 같이 routing_key에 값을 추가해서 메시지를 전송하면 됩니다.

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

Logging 시스템 예제 코드

image

log_emitter.py

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

log_receiver.py

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(
        exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

Topic

그 외 아래 그림처럼 다양하고 자유로운 형식의 routing_key를 활용하기 위해서는 exchange_typetopic으로 할 수 있습니다.

image

routing_key<celerity>.<colour>.<species>와 같은 형태로 하고, 각 Queue의 바인딩 키를 *.orange.*, *.*.rabbit, lazy.# 등으로 설정할 수 있습니다.

Python에서 RabbitMq 사용하기 (2) - Publish/Subscribe

|

Publish/Subscribe

Exchange

본격적인 Exchange에 대한 내용이 등장합니다. Publishe/Subscribe의 핵심은 하나의 메시지를 관심이 있는 여러 개의 Subscriber에게 전달하는 것입니다.

아래의 예시는 Logging 시스템이며, 하나의 프로그램에서 Log 메시지를 전송하면, Disk에 Log을 기록하는 프로그램과 화면에 Log를 출력하는 프로그램이 각각 메시지를 수신합니다.

image

앞서 포스팅에서 Producer는 메시지를 절대 Queue에 직접 전송하지 않는다는 내용이 있습니다. Producer는 Exchange에게 메시지를 전송하며, Exchange에서 관련이 있는 Queue에 메시지를 넣어 줍니다.

Exchange는 아래와 같이 선언 가능합니다.

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

이 때 exchange_type는 다음과 같은 값을 가질 수 있습니다.

  • direct
  • topic
  • headers
  • fanout

여기에서 fanoutExchange가 알고 있는 Queue들에게 메시지를 펼쳐서 전송하는 옵션으로 예시로 든 Logging 시스템에 적합한 타입입니다.

기존 포스팅에서 메시지를 전송하는 코드는 다음과 같았습니다.

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)

exchange 값이 입력되지 않을 경우, default exchange를 사용하게 됩니다. 그리고, 메시지는 routing_key 값의 이름을 갖는 Queue에 전달됩니다.

Temporary queues

Publish/Subscribe 방식으로 동작할 때 여러 개의 Subscriber를 동작시키려면 Queue가 여러 개 필요합니다. 이럴 때 여러 개의 Queue를 관리하는 것이 어려울 수 있습니다. 위의 예시에서 Disk Logging 프로그램과 Screen Logging 프로그램마다 각각의 Queue를 사용해야 하는데, 각 Queue의 이름을 queue-disk-logging, queue-screen-logging와 같이 정할 수도 있지만, 새로운 프로그램이 추가될 때마다 Queue의 이름을 계속 짓는 것은 불편한 일이 될 수 있습니다.

만약 Temporary queues 기능을 이용하면 Queue의 이름을 랜덤으로 지을 수 있습니다. 또한 exclusive=True 옵션을 이용해서 Subscriber가 종료되면 해당 Queue를 자동으로 없어지도록 할 수 있습니다.

Queue의 이름을 랜덤으로 짓는 방법은 다음과 같습니다.

result = channel.queue_declare(queue='', exclusive=True)

이러면 랜덤으로 지어진 Queue의 이름은 result.method.queue으로 리턴되며, amq.gen-JzTY20BRgKO-HjmUJj0wLg와 같은 랜덤한 이름이 됩니다. 또한 exclusive=True 옵션에 의해 Subscriber가 종료되면 해당 Queue를 자동으로 지우도록 합니다.

Binding

ExchangeQueue를 연결하는 작업입니다. 아래와 같은 코드를 작성하면 log라는 이름의 Exchange와 랜덤으로 지어진 Queue를 연결합니다. 나중에 log 이름의 Exchange에 메시지가 들어오면 바인딩된 Queue들이 해당 메시지를 수신하게 됩니다.

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

image

publisher.py

아래는 Log 메시지를 Publish하는 예제입니다.

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()

기존 예제와 달라진, 눈여겨 볼 부분은 다음 코드입니다.

channel.basic_publish(exchange='logs', routing_key='', body=message)

기존에는 exchange의 값이 비어있었고, routing_key 값에 Queue의 이름이 들어갔으나, 지금 예제는 exchange의 값이 채워졌고, routing_key 값은 비어있습니다.

subscriber.py

아래는 메시지를 Subscription하는 예제입니다.

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

여기에서 눈여겨 볼 부분은 아래의 코드입니다.

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

랜덤한 이름의 Queue를 하나 생성하고, 그 Queue를 logs라는 이름의 exchange에 할당하는 코드입니다.

Python에서 RabbitMq 사용하기 (1) - RabbitMQ 기본 동작

|

RabbitMQ 기본 동작

RabbitMQ는 프로그램간 메시지를 쉽게 주고 받을 수 있도록 하는 Message Queue입니다. 기본적으로 Producer에서 생성한 Message를 Queue를 이용해서 Consumer에게 전달하는 Producer-Consumer 패턴으로 되어 있으며, Producer와 Consumer간 느슨한 결합을 할 수 있게 해줍니다.

기본적인 동작을 대략적인 그림으로 표현하면 다음과 같습니다.

image

하지만, 실제 동작은 위 그림과는 조금 다릅니다. RabbitMQ에서는 Producer에서 Queue로 직접 Message를 보내는 경우는 없습니다. Message는 Exchange라는 요소를 거쳐서 Queue에 전달되지만, Exchange에 대한 내용은 후반에 다룰 예정이기 때문에 위 그림에서는 생략되어 있습니다.

sender.py

sender는 정의한 Queue에 메시지를 전송합니다. 실제로는 직접적으로 Queue에 메시지를 넣는 것이 아니라 Exchange에 메시지를 입력하지만, 아래 예제에서는 Exchange를 default 값으로 지정했습니다.

메시지를 전송하는 코드는 다음과 같습니다.

channel.basic_publish(exchange='', routing_key=QUEUE_NAME, body=msg)

전체 코드는 다음과 같습니다.

from datetime import datetime
import pika

HOST_NAME = "localhost"
QUEUE_NAME = "snowdeer_queue"


def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST_NAME))
    channel = connection.channel()

    channel.queue_declare(queue=QUEUE_NAME)

    msg = f"[{datetime.now()}] hello, snowdeer !!"
    channel.basic_publish(exchange='', routing_key=QUEUE_NAME, body=msg)
    print(f"Sent message.\n{msg}")
    connection.close()


if __name__ == '__main__':
    main()

receiver.py

메시지를 수신하는 예제입니다. callback 함수를 통해 메시지를 수신하며, 메시지를 대기하는 부분은 다음과 같습니다.

channel.start_consuming()

전체 코드입니다.

import pika

HOST_NAME = "localhost"
QUEUE_NAME = "snowdeer_queue"


def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=HOST_NAME))
    channel = connection.channel()

    channel.queue_declare(queue=QUEUE_NAME)

    def callback(ch, method, properties, body):
        print("Message is Arrived %r" % body)

    channel.basic_consume(queue=QUEUE_NAME,
                          on_message_callback=callback,
                          auto_ack=True)

    try:
        print("Waiting for messages.")
        channel.start_consuming()
    except KeyboardInterrupt:
        print('Ctrl+C is Pressed.')


if __name__ == '__main__':
    main()

Message의 생명 주기

만약 RabbitMQ의 Queue에 메시지가 입력되었는데, Consumer가 메시지를 가져가기 전에 RabbitMQ가 재실행되는 상황이 발생하게 된다면, Queue의 메시지가 삭제됩니다. 중요한 메시지라면 메시지의 내구성이 매우 중요하며, 아래 명령어를 이용해서 Queue에 입력된 메시지를 Disk에 저장하여 메시지의 내구성을 높일 수 있습니다.

channel.queue_declare(queue='task_queue', durable=True)

위 명령어는 기존에 존재하지 않던 Queue를 새로 정의할 때만 적용되며, 이미 정의되어있는 Queue에 대해서는 동작하지 않습니다.

그리고 메시지를 전송할 때도 다음과 같이 내구성을 적용할 수 있습니다.

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE
                      ))

하나의 Queue에 복수 Receiver 연결

만약 하나의 Queue에 여러 개의 Receiver를 연결하면 어떻게 될까요? 각 Receiver들이 차례대로 하나씩 메시지를 가져갑니다.

image

이 빈도는 아래 명령어를 이용해서 조정할 수 있습니다.

channel.basic_qos(prefetch_count=1)

M1에 brew 실행 오류 발생 해결법

|

M1에 brew 실행 오류 발생 해결법

M1 맥북에서 brew 명령어를 실행했을 때 오류가 발생한 경우에는 brew doctor로 문제를 해결해줍니다. 저같은 경우는 brew install tree에서 오류가 발생했네요.

$ brew install tree

Warning: No available formula with the name "tree".
==> Searching for similarly named formulae...
Error: No similarly named formulae found.
==> Searching for a previously deleted formula (in the last month)...
Error: No previously deleted formula found.
==> Searching taps on GitHub...
Error: No formulae found in taps.

이 경우 brew doctor를 실행하면 다음과 같이 현재 문제점을 알려줍니다.

$ brew doctor

Please note that these warnings are just used to help the Homebrew maintainers
with debugging if you file an issue. If everything you use Homebrew for is
working fine: please don't worry or file an issue; just ignore this. Thanks!

Warning: A newer Command Line Tools release is available.
Update them from Software Update in System Preferences or run:
  softwareupdate --all --install --force

If that doesn't show you any updates, run:
  sudo rm -rf /Library/Developer/CommandLineTools
  sudo xcode-select --install

Alternatively, manually download them from:
  https://developer.apple.com/download/all/.
You should download the Command Line Tools for Xcode 13.1.


Warning: Homebrew/homebrew-core was not tapped properly! Run:
  rm -rf "/opt/homebrew/Library/Taps/homebrew/homebrew-core"
  brew tap homebrew/core

Warning: Some taps are not on the default git origin branch and may not receive
updates. If this is a surprise to you, check out the default branch with:
  git -C $(brew --repo homebrew/core) checkout master

저 같은 경우 Homebrew/homebrew-core was not tapped 문제가 있어 다음 명령어를 실행했습니다.

$ rm -rf "/opt/homebrew/Library/Taps/homebrew/homebrew-core"
$ brew tap homebrew/core