Python에서 RabbitMq 사용하기 (5) - RabbitMq Utlity
24 Dec 2021 | Python RabbitMQRabbitMq Utlity
rabbitmq_util.py
from http.client import HTTPConnection import json from urllib.parse import quote from base64 import b64encode class RabbitMqUtil: __OVERVIEW_URL = "/api/overview" __VHOSTS_URL = "/api/vhosts" __USERS_URL = "/api/users" __EXCHANGES_URL = "/api/exchanges" __QUEUES_URL = "/api/queues" __BINDINGS_URL = "/api/bindings" def __init__(self, host="localhost", port=15672, user="guest", passwd="guest"): self.__host = host self.__port = port encoded_auth = b64encode(f"{user}:{passwd}".encode()).decode() self.__header = {"Authorization": f"Basic {encoded_auth}", "content-type": "application/json"} def __request(self, method, url, body=None): connection = HTTPConnection(self.__host, self.__port, timeout=5) connection.request(method=method, url=url, headers=self.__header, body=body) response = connection.getresponse() code = response.getcode() content = response.read() response.close() connection.close() return code, content def get_overview(self): return self.__request("GET", RabbitMqUtil.__OVERVIEW_URL) def get_users(self): result = [] code, content = self.__request("GET", RabbitMqUtil.__USERS_URL) users = json.loads(content) for item in users: result.append(item["name"]) return result def get_vhosts(self): result = [] code, content = self.__request("GET", RabbitMqUtil.__VHOSTS_URL) vhosts = json.loads(content) for item in vhosts: result.append(item["name"]) return result def create_vhosts(self, name): name = quote(name, "") code, content = self.__request("PUT", f"{RabbitMqUtil.__VHOSTS_URL}/{name}") return (code == 201) or (code == 204) def delete_vhosts(self, name): name = quote(name, "") code, content = self.__request("DELETE", f"{RabbitMqUtil.__VHOSTS_URL}/{name}") return (code == 201) or (code == 204) def get_exchanges(self, vhost): result = [] vhost = quote(vhost, "") code, content = self.__request("GET", f"{RabbitMqUtil.__EXCHANGES_URL}/{vhost}") exchanges = json.loads(content) for item in exchanges: result.append(item["name"]) return result def create_exchange(self, vhost, exchange, exchange_type="fanout", auto_delete=False, durable=True, internal=False, arguments=None): vhost = quote(vhost, '') exchange = quote(exchange, '') base_body = {"type": exchange_type, "auto_delete": auto_delete, "durable": durable, "internal": internal, "arguments": arguments or list()} body = json.dumps(base_body) code, content = self.__request("PUT", f"{RabbitMqUtil.__EXCHANGES_URL}/{vhost}/{exchange}", body) return (code == 201) or (code == 204) def delete_exchange(self, vhost, exchange): vhost = quote(vhost, '') exchange = quote(exchange, '') code, content = self.__request("DELETE", f"{RabbitMqUtil.__EXCHANGES_URL}/{vhost}/{exchange}") return (code == 201) or (code == 204) def get_queues(self, vhost): result = [] vhost = quote(vhost, "") code, content = self.__request("GET", f"{RabbitMqUtil.__QUEUES_URL}/{vhost}") exchanges = json.loads(content) for item in exchanges: result.append(item["name"]) return result def create_queue(self, vhost, queue, **kwargs): vhost = quote(vhost, '') queue = quote(queue, '') body = json.dumps(kwargs) code, content = self.__request("PUT", f"{RabbitMqUtil.__QUEUES_URL}/{vhost}/{queue}", body) return (code == 201) or (code == 204) def delete_queue(self, vhost, queue): vhost = quote(vhost, '') queue = quote(queue, '') code, content = self.__request("DELETE", f"{RabbitMqUtil.__QUEUES_URL}/{vhost}/{queue}") return (code == 201) or (code == 204) def purge_queue(self, vhost, queue): vhost = quote(vhost, '') queue = quote(queue, '') code, content = self.__request("DELETE", f"{RabbitMqUtil.__QUEUES_URL}/{vhost}/{queue}/contents") return (code == 201) or (code == 204) def get_bindings(self, vhost): vhost = quote(vhost, '') code, content = self.__request("GET", f"{RabbitMqUtil.__BINDINGS_URL}/{vhost}") return content def get_queue_bindings(self, vhost, queue): vhost = quote(vhost, '') queue = quote(queue, '') code, content = self.__request("GET", f"{RabbitMqUtil.__QUEUES_URL}/{vhost}/{queue}/bindings") return content def create_binding(self, vhost, exchange, queue, routing_key="", args=None): vhost = quote(vhost, '') exchange = quote(exchange, '') queue = quote(queue, '') body = json.dumps({"routing_key": routing_key, "arguments": args or {}}) code, content = self.__request("POST", f"{RabbitMqUtil.__BINDINGS_URL}/{vhost}/e/{exchange}/q/{queue}", body) return (code == 201) or (code == 204) def delete_binding(self, vhost, exchange, queue, routing_key): vhost = quote(vhost, '') exchange = quote(exchange, '') queue = quote(queue, '') body = '' code, content = self.__request("DELETE", f"{RabbitMqUtil.__BINDINGS_URL}/{vhost}/e/{exchange}/q/{queue}/{routing_key}", body) return (code == 201) or (code == 204)
main.py
테스트용 코드로 별도로 필요하진 않습니다.
from pyrabbit.rabbitmq_util import RabbitMqUtil rabbit = RabbitMqUtil("localhost") VHOST_NAME = "snowdeer_vhost" EXCHANGE_NAME = "snowdeer_exchange" QUEUE_NAME = "snowdeer_queue" def main(): print("<OVERVIEW>") overview = rabbit.get_overview() print(f"overview: {overview}") test_users() test_vhosts(VHOST_NAME) test_exchanges(VHOST_NAME, EXCHANGE_NAME) test_queues(VHOST_NAME, QUEUE_NAME) test_bindings(VHOST_NAME, EXCHANGE_NAME, QUEUE_NAME) # delete_resources(VHOST_NAME, EXCHANGE_NAME, QUEUE_NAME) def test_users(): print("\n<TEST USERS>") users = rabbit.get_users() print(f"users: {users}") def test_vhosts(vhost_name): print("\n<TEST VHOSTS>") vhosts = rabbit.get_vhosts() print(f"vhosts: {vhosts}") ret = rabbit.create_vhosts(vhost_name) print(f"create vhost: {ret}") vhosts = rabbit.get_vhosts() print(f"vhosts: {vhosts}") def test_exchanges(vhost, exchange): print("\n<TEST EXCHANGES>") exchanges = rabbit.get_exchanges(vhost) print(f"exchanges: {exchanges}") ret = rabbit.create_exchange(vhost, exchange) print(f"create exchanges: {ret}") exchanges = rabbit.get_exchanges(vhost) print(f"exchanges: {exchanges}") def test_queues(vhost, queue): print("\n<TEST QUEUES>") queues = rabbit.get_queues(vhost) print(f"queues: {queues}") ret = rabbit.create_queue(vhost, queue) print(f"create queue: {ret}") queues = rabbit.get_queues(vhost) print(f"queues: {queues}") ret = rabbit.purge_queue(vhost, queue) print(f"purge queue: {ret}") def test_bindings(vhost, exchange, queue): print("\n<TEST BINDINGS>") bindings = rabbit.get_bindings(vhost) print(f"queues: {bindings}") bindings = rabbit.get_queue_bindings(vhost, queue) print(f"queues: {bindings}") ret = rabbit.create_binding(vhost, exchange, queue, routing_key="hello") print(f"create binding: {ret}") bindings = rabbit.get_bindings(vhost) print(f"queues: {bindings}") bindings = rabbit.get_queue_bindings(vhost, queue) print(f"queues: {bindings}") def delete_resources(vhost, exchange, queue): ret = rabbit.delete_queue(vhost, queue) print(f"delete queue: {ret}") queues = rabbit.get_queues(vhost) print(f"queues: {queues}") ret = rabbit.delete_exchange(vhost, exchange) print(f"delete exchanges: {ret}") exchanges = rabbit.get_exchanges(vhost) print(f"exchanges: {exchanges}") ret = rabbit.delete_vhosts(VHOST_NAME) print(f"delete vhost: {ret}") vhosts = rabbit.get_vhosts() print(f"vhosts: {vhosts}") def test_apis(): rabbit.create_vhosts("test_vhost") rabbit.create_exchange("test_vhost", "ex1") rabbit.create_exchange("test_vhost", "ex2") rabbit.create_queue("test_vhost", "q1") rabbit.create_queue("test_vhost", "q2") rabbit.create_queue("test_vhost", "q3") rabbit.create_binding("test_vhost", "ex1", "q1") rabbit.create_binding("test_vhost", "ex2", "q2", routing_key="hello") rabbit.create_binding("test_vhost", "ex2", "q3", routing_key="snowdeer_binding") if __name__ == '__main__': main() # test_apis()