Flutter Canvas 그리기 예제 - (1) 배경 색상 그리기

|

Canvas 사용하기

Flutter에서 Canvas를 사용하기 위해서는 CustomPainter라는 위젯을 상속받아 구현해야 합니다. 그리고 void paint(...) 함수와 bool shouldRepaint(...) 함수를 오버라이드해서 구현해주면 됩니다.

BackgroundPainter 클래스

class BackgroundPainter extends CustomPainter {
  @override
  void paint(Canvas canvas, Size size) {
    var background = Paint()
      ..style = PaintingStyle.fill
      ..color = Colors.lime
      ..isAntiAlias = true;

    Rect rect = Rect.fromLTWH(0, 0, size.width, size.height);
    canvas.drawRect(rect, background);
  }

  @override
  bool shouldRepaint(CustomPainter oldDelegate) => false;
}

void paint(Canvas canvas, Size size) 메소드의 인자로 넘어오는 size는 현재 CustomPainter의 크기가 들어옵니다.

EmptyPage 클래스

그리고 위에서 구현한 BackgroundPainter를 화면에 그리기 위해서는 다음과 같이 CustomPainterpaint 속성에 위에서 만든 객체를 배치하면 됩니다.

class EmptyPage extends StatelessWidget {
  const EmptyPage({Key key}) : super(key: key);

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      body: CustomPaint(
        child: Container(),
        painter: BackgroundPainter(),
      ),
    );
  }
}

여기서 주의할 점은 CustomPaint 속성 중 child: Container()를 지정해주어야 BackgroundPainter 클래스의 void paint(Canvas canvas, Size size) 함수 인자인 size 크기가 전달됩니다. 만약 child 속성이 없다면 size는 (0, 0)이 됩니다.

전체 코드

import 'package:flutter/material.dart';

class EmptyPage extends StatelessWidget {
  const EmptyPage({Key key}) : super(key: key);

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      body: CustomPaint(
        child: Container(),
        painter: BackgroundPainter(),
      ),
    );
  }
}

class BackgroundPainter extends CustomPainter {
  @override
  void paint(Canvas canvas, Size size) {
    var background = Paint()
      ..style = PaintingStyle.fill
      ..color = Colors.lime
      ..isAntiAlias = true;

    Rect rect = Rect.fromLTWH(0, 0, size.width, size.height);
    canvas.drawRect(rect, background);
  }

  @override
  bool shouldRepaint(CustomPainter oldDelegate) => false;
}

main.dart

</pre class=”prettyprint”> import ‘package:flutter/material.dart’;

import ‘canvas/01_background_color.dart’;

void main() { runApp(MyApp()); }

class MyApp extends StatelessWidget { @override Widget build(BuildContext context) { return MaterialApp( title: ‘Flutter Demo’, debugShowCheckedModeBanner: false, theme: ThemeData( primarySwatch: Colors.blue, visualDensity: VisualDensity.adaptivePlatformDensity, ), home: EmptyPage(), ); } } </pre>

실행 화면

image

Python Blocking Socket 및 Non-Blocking Socket 예제

|

Python Blocking Socket 및 Non-Blocking Socket 예제

Blocking Socket 예제

다음 코드는 httpbin.org/delay/3 서브도메인에 접속해서 3초 딜레이 이후 응답을 받는 코드입니다. s.recv 코드에서 3초동안 Blocking 되서 동작합니다.

import socket

s = socket.create_connection(('httpbin.org', 80))
s.sendall(b'GET /delay/3 HTTP/1.1\r\nHost: httpbin.org\r\n\r\n')

buf = s.recv(1024)
print(buf)

Non-Blocking Socket 예제

다음은 Non-Blocking으로 동작하기 때문에 중간에 while 문으로 응답이 완료될 때까지 대기합니다.

import select
import socket

s = socket.create_connection(('httpbin.org', 80))
s.setblocking(False)

s.sendall(b'GET /delay/3 HTTP/1.1\r\nHost: httpbin.org\r\n\r\n')

while True:
    ready_to_read, ready_to_write, in_error = select.select([s], [], [])
    if s in ready_to_read:
        break

buf = s.recv(1024)
print(buf)

select

여기에서 사용한 select는 여러 개의 이벤트 소스를 결합하여 쉽게 모니터링할 수 있는 기술이며, 오래된 기술입니다. 그래서 최상의 성능을 보여주지는 못하며 리눅스의 epoll이나 FreeBSD의 kqueue와 같이 운영체제마다 다른 대안과 최적화를 구현해서 사용하고 있습니다.

파이썬에서는 다음 포스팅에서 다룰 asyncio라는 추상화 계층을 이용해서 사용하는 것이 더 좋습니다.

Python Socket으로 HTTP Request 보내는 방법

|

Socket으로 HTTP Request 보내는 예제 코드

HTTP 1.1 부터는 Header 정보에 Host 정보를 넣어줘야 하며, 마지막 부분에 2개의 \r\n이 필요합니다.

다음 2가지 예제는 모두 같은 코드입니다.

import socket

s = socket.create_connection(('httpbin.org', 80))
s.sendall(b'GET / HTTP/1.1\r\nHost: httpbin.org\r\n\r\n')

buf = s.recv(1024)
print(buf)
import socket

s = socket.create_connection(('httpbin.org', 80))
s.sendall(b'''GET / HTTP/1.1
Host: httpbin.org

''')

buf = s.recv(1024)
print(buf)

Python futures 예제

|

Futures

파이썬 3.2에 도입된 concurrent.futures 모듈이며, 나중에 파이썬 2에도 적용되었습니다. 파이썬은 concurrent.futures.ThreadPoolExecutorconcurrent.futures.Process.PoolExecutor의 2종류의 Executor를 지원합니다. 각각 Thread 기반과 Process 기반입니다.

간단한 예제

import time
from concurrent import futures


def loop(name, step):
    __sum = 0
    for i in range(0, 10):
        __sum = __sum + step
        print(f'[{name}] is looping({i}) ...')
        time.sleep(0.1)

    return __sum


if __name__ == '__main__':
    print(f'calculating ...')
    with futures.ThreadPoolExecutor(max_workers=3) as executor:
        f1 = executor.submit(loop, 'apple', 1)
        f2 = executor.submit(loop, 'bread', 2)
        f3 = executor.submit(loop, 'carrot', 3)
        f4 = executor.submit(loop, 'david', 4)
        f5 = executor.submit(loop, 'egg', 5)

    print(f'')
    print(f'result #1 : {f1.result()}')
    print(f'result #2 : {f2.result()}')
    print(f'result #3 : {f3.result()}')
    print(f'result #4 : {f4.result()}')
    print(f'result #5 : {f5.result()}')
</pre>

Python에서 RabbitMq 사용하기 (5) - RabbitMq Utlity

|

RabbitMq Utlity

rabbitmq_util.py

from http.client import HTTPConnection
import json
from urllib.parse import quote
from base64 import b64encode


class RabbitMqUtil:
    __OVERVIEW_URL = "/api/overview"
    __VHOSTS_URL = "/api/vhosts"
    __USERS_URL = "/api/users"
    __EXCHANGES_URL = "/api/exchanges"
    __QUEUES_URL = "/api/queues"
    __BINDINGS_URL = "/api/bindings"

    def __init__(self, host="localhost", port=15672, user="guest", passwd="guest"):
        self.__host = host
        self.__port = port
        encoded_auth = b64encode(f"{user}:{passwd}".encode()).decode()
        self.__header = {"Authorization": f"Basic {encoded_auth}",
                         "content-type": "application/json"}

    def __request(self, method, url, body=None):
        connection = HTTPConnection(self.__host, self.__port, timeout=5)
        connection.request(method=method,
                           url=url,
                           headers=self.__header,
                           body=body)
        response = connection.getresponse()
        code = response.getcode()
        content = response.read()
        response.close()
        connection.close()

        return code, content

    def get_overview(self):
        return self.__request("GET", RabbitMqUtil.__OVERVIEW_URL)

    def get_users(self):
        result = []
        code, content = self.__request("GET", RabbitMqUtil.__USERS_URL)
        users = json.loads(content)
        for item in users:
            result.append(item["name"])

        return result

    def get_vhosts(self):
        result = []
        code, content = self.__request("GET", RabbitMqUtil.__VHOSTS_URL)
        vhosts = json.loads(content)
        for item in vhosts:
            result.append(item["name"])

        return result

    def create_vhosts(self, name):
        name = quote(name, "")
        code, content = self.__request("PUT", f"{RabbitMqUtil.__VHOSTS_URL}/{name}")

        return (code == 201) or (code == 204)

    def delete_vhosts(self, name):
        name = quote(name, "")
        code, content = self.__request("DELETE", f"{RabbitMqUtil.__VHOSTS_URL}/{name}")

        return (code == 201) or (code == 204)

    def get_exchanges(self, vhost):
        result = []
        vhost = quote(vhost, "")
        code, content = self.__request("GET", f"{RabbitMqUtil.__EXCHANGES_URL}/{vhost}")
        exchanges = json.loads(content)
        for item in exchanges:
            result.append(item["name"])

        return result

    def create_exchange(self, vhost, exchange,
                        exchange_type="fanout",
                        auto_delete=False,
                        durable=True,
                        internal=False,
                        arguments=None):
        vhost = quote(vhost, '')
        exchange = quote(exchange, '')
        base_body = {"type": exchange_type,
                     "auto_delete": auto_delete,
                     "durable": durable,
                     "internal": internal,
                     "arguments": arguments or list()}
        body = json.dumps(base_body)
        code, content = self.__request("PUT", f"{RabbitMqUtil.__EXCHANGES_URL}/{vhost}/{exchange}", body)

        return (code == 201) or (code == 204)

    def delete_exchange(self, vhost, exchange):
        vhost = quote(vhost, '')
        exchange = quote(exchange, '')
        code, content = self.__request("DELETE", f"{RabbitMqUtil.__EXCHANGES_URL}/{vhost}/{exchange}")

        return (code == 201) or (code == 204)

    def get_queues(self, vhost):
        result = []
        vhost = quote(vhost, "")
        code, content = self.__request("GET", f"{RabbitMqUtil.__QUEUES_URL}/{vhost}")
        exchanges = json.loads(content)
        for item in exchanges:
            result.append(item["name"])

        return result

    def create_queue(self, vhost, queue, **kwargs):
        vhost = quote(vhost, '')
        queue = quote(queue, '')
        body = json.dumps(kwargs)
        code, content = self.__request("PUT", f"{RabbitMqUtil.__QUEUES_URL}/{vhost}/{queue}", body)

        return (code == 201) or (code == 204)

    def delete_queue(self, vhost, queue):
        vhost = quote(vhost, '')
        queue = quote(queue, '')
        code, content = self.__request("DELETE", f"{RabbitMqUtil.__QUEUES_URL}/{vhost}/{queue}")

        return (code == 201) or (code == 204)

    def purge_queue(self, vhost, queue):
        vhost = quote(vhost, '')
        queue = quote(queue, '')
        code, content = self.__request("DELETE", f"{RabbitMqUtil.__QUEUES_URL}/{vhost}/{queue}/contents")

        return (code == 201) or (code == 204)

    def get_bindings(self, vhost):
        vhost = quote(vhost, '')
        code, content = self.__request("GET", f"{RabbitMqUtil.__BINDINGS_URL}/{vhost}")

        return content

    def get_queue_bindings(self, vhost, queue):
        vhost = quote(vhost, '')
        queue = quote(queue, '')
        code, content = self.__request("GET", f"{RabbitMqUtil.__QUEUES_URL}/{vhost}/{queue}/bindings")

        return content

    def create_binding(self, vhost, exchange, queue, routing_key="", args=None):
        vhost = quote(vhost, '')
        exchange = quote(exchange, '')
        queue = quote(queue, '')
        body = json.dumps({"routing_key": routing_key, "arguments": args or {}})
        code, content = self.__request("POST",
                                       f"{RabbitMqUtil.__BINDINGS_URL}/{vhost}/e/{exchange}/q/{queue}",
                                       body)

        return (code == 201) or (code == 204)

    def delete_binding(self, vhost, exchange, queue, routing_key):
        vhost = quote(vhost, '')
        exchange = quote(exchange, '')
        queue = quote(queue, '')
        body = ''
        code, content = self.__request("DELETE",
                                       f"{RabbitMqUtil.__BINDINGS_URL}/{vhost}/e/{exchange}/q/{queue}/{routing_key}",
                                       body)

        return (code == 201) or (code == 204)

main.py

테스트용 코드로 별도로 필요하진 않습니다.

from pyrabbit.rabbitmq_util import RabbitMqUtil

rabbit = RabbitMqUtil("localhost")

VHOST_NAME = "snowdeer_vhost"
EXCHANGE_NAME = "snowdeer_exchange"
QUEUE_NAME = "snowdeer_queue"


def main():
    print("<OVERVIEW>")
    overview = rabbit.get_overview()
    print(f"overview: {overview}")

    test_users()
    test_vhosts(VHOST_NAME)
    test_exchanges(VHOST_NAME, EXCHANGE_NAME)
    test_queues(VHOST_NAME, QUEUE_NAME)
    test_bindings(VHOST_NAME, EXCHANGE_NAME, QUEUE_NAME)
    # delete_resources(VHOST_NAME, EXCHANGE_NAME, QUEUE_NAME)


def test_users():
    print("\n<TEST USERS>")

    users = rabbit.get_users()
    print(f"users: {users}")


def test_vhosts(vhost_name):
    print("\n<TEST VHOSTS>")

    vhosts = rabbit.get_vhosts()
    print(f"vhosts: {vhosts}")

    ret = rabbit.create_vhosts(vhost_name)
    print(f"create vhost: {ret}")

    vhosts = rabbit.get_vhosts()
    print(f"vhosts: {vhosts}")


def test_exchanges(vhost, exchange):
    print("\n<TEST EXCHANGES>")

    exchanges = rabbit.get_exchanges(vhost)
    print(f"exchanges: {exchanges}")

    ret = rabbit.create_exchange(vhost, exchange)
    print(f"create exchanges: {ret}")

    exchanges = rabbit.get_exchanges(vhost)
    print(f"exchanges: {exchanges}")


def test_queues(vhost, queue):
    print("\n<TEST QUEUES>")

    queues = rabbit.get_queues(vhost)
    print(f"queues: {queues}")

    ret = rabbit.create_queue(vhost, queue)
    print(f"create queue: {ret}")

    queues = rabbit.get_queues(vhost)
    print(f"queues: {queues}")

    ret = rabbit.purge_queue(vhost, queue)
    print(f"purge queue: {ret}")


def test_bindings(vhost, exchange, queue):
    print("\n<TEST BINDINGS>")

    bindings = rabbit.get_bindings(vhost)
    print(f"queues: {bindings}")

    bindings = rabbit.get_queue_bindings(vhost, queue)
    print(f"queues: {bindings}")

    ret = rabbit.create_binding(vhost, exchange, queue, routing_key="hello")
    print(f"create binding: {ret}")

    bindings = rabbit.get_bindings(vhost)
    print(f"queues: {bindings}")

    bindings = rabbit.get_queue_bindings(vhost, queue)
    print(f"queues: {bindings}")


def delete_resources(vhost, exchange, queue):
    ret = rabbit.delete_queue(vhost, queue)
    print(f"delete queue: {ret}")

    queues = rabbit.get_queues(vhost)
    print(f"queues: {queues}")

    ret = rabbit.delete_exchange(vhost, exchange)
    print(f"delete exchanges: {ret}")

    exchanges = rabbit.get_exchanges(vhost)
    print(f"exchanges: {exchanges}")

    ret = rabbit.delete_vhosts(VHOST_NAME)
    print(f"delete vhost: {ret}")

    vhosts = rabbit.get_vhosts()
    print(f"vhosts: {vhosts}")


def test_apis():
    rabbit.create_vhosts("test_vhost")

    rabbit.create_exchange("test_vhost", "ex1")
    rabbit.create_exchange("test_vhost", "ex2")

    rabbit.create_queue("test_vhost", "q1")
    rabbit.create_queue("test_vhost", "q2")
    rabbit.create_queue("test_vhost", "q3")

    rabbit.create_binding("test_vhost", "ex1", "q1")
    rabbit.create_binding("test_vhost", "ex2", "q2", routing_key="hello")
    rabbit.create_binding("test_vhost", "ex2", "q3", routing_key="snowdeer_binding")


if __name__ == '__main__':
    main()
    # test_apis()