Git Submodule을 이용한 Python 라이브러리 패키지 관리(Docker 빌드 가능)

|

Git Submodule을 이용한 Python 라이브러리 패키지 관리

Git Submodule을 이용한 프로젝트 추가

git submodule add https://github.com/snowdeer/python_sample_library libs/python_sample_library

Install Package

pip3 install libs/python_sample_library

Dockerfile 생성

FROM python:3.8
WORKDIR /app

# Install regular packages
COPY requirements.txt .
RUN pip install -r requirements.txt

# Install submodule packages
COPY libs/python_sample_library libs/python_sample_library
RUN pip3 install libs/python_sample_library

# copy source code
COPY ./ .

# command to run on container start
CMD [ "python", "./app.py"]

Python Redis RPC 예제

|

Pythong Redis RPC

redis_rpc.py

import json
import uuid
import redis
import logging

logger = logging.getLogger('redis_rpc')


class TimeoutException(Exception):
    pass


class Server(object):
    def __init__(self, name, host='localhost', port=6379):
        self.__redis = redis.Redis(host=host, port=port)
        self.__prefix = 'pyredisrpc:'
        self.__queue = self.__prefix + name
        self.__response_expire_time = 60
        self.__methods = {}
        self.__is_running = False

    def start(self):
        logging.debug('RedisRpcServer::start()')
        self.__redis.flushall()

        self.__is_running = True
        while self.__is_running:
            _, req_data = self.__redis.blpop(self.__queue)
            req_data = req_data.decode()
            req_args = self.__parse_request(req_data)
            if req_args is None:
                continue

            req_id, method, params, timeout_check = req_args
            if timeout_check and self.__is_timeout_expired(req_id):
                continue

            self.__call_method(req_id, method, params)

    def shutdown(self):
        logging.debug('RedisRpcServer::shutdown()')
        self.__is_running = False

    def __parse_request(self, req_data):
        try:
            req = json.loads(req_data)
        except json.JSONDecodeError:
            logger.warning(f'Exception - invalid request(JSON Dcode error)\n{req_data}')
            return

        try:
            req_id = req['id']
            method = req['method']
            params = req['params']
        except KeyError as e:
            key = e.args[0]
            self.__send_response(req_id, None, f'Exception - invalid request key({key})')
            return

        if method not in self.__methods:
            self.__send_response(req_id, None, f'Exception - invalid method({method})')
            return

        if type(params) != list or len(params) != 2 or type(params[0]) != list or type(params[1]) != dict:
            self.__send_response(req_id, None, f'Exception - invalid params({params})')
            return

        timeout_check = req.get('timeout') == 1

        return req_id, method, params, timeout_check

    def __is_timeout_expired(self, req_id):
        timeout_key = self.__prefix + req_id + ':timeout'
        return self.__redis.get(timeout_key) is None

    def __call_method(self, req_id, method, params):
        func = self.__methods[method]
        params_args = params[0]
        params_kw = params[1]

        try:
            result = func(*params_args, **params_kw)
            logger.info(f'{method}() is requested. params: {params}, result: {result}')
            self.__send_response(req_id, result)
        except Exception as e:
            self.__send_response(req_id, None, f'Exception - {repr(e)}')

    def __send_response(self, req_id, result, error_message=None):
        if error_message is not None:
            logger.warning(f'Exception - {error_message}')

        result = {'id': req_id, 'result': result, 'error': error_message}
        
        key = self.__prefix + req_id
        self.__redis.rpush(key, json.dumps(result))
        self.__redis.expire(key, self.__response_expire_time)

    def method(self, f):
        self.__methods[f.__name__] = f


class Client(object):
    def __init__(self, name, host='localhost', port=6379, timeout=0):
        self.__redis = redis.Redis(host=host, port=port)
        self.__prefix = 'pyredisrpc:'
        self.__queue = self.__prefix + name
        self.__timeout = timeout

    def __call(self, method, params):
        req_id = uuid.uuid4().hex
        req = {'id': req_id, 'method': method, 'params': params}

        if self.__timeout != 0:
            req['timeout'] = 1
            timeout_key = self.__prefix + req_id + ':timeout'
            self.__redis.set(timeout_key, 1, self.__timeout)

        self.__redis.rpush(self.__queue, json.dumps(req))
        key = self.__prefix + req_id
        res = self.__redis.blpop(key, self.__timeout)

        if not res:
            raise TimeoutException(req, key)

        _, response_data = res
        response = json.loads(response_data.decode())

        if response['error'] is not None:
            raise Exception(response['error'])

        return response['result']

    def __getattr__(self, method):
        def wrap(*args, **kw):
            return self.__call(method, [args, kw])

        return wrap

server.py

import logging

import redis_rpc as pyredisrpc
from colored_log_handler import ColoredLogHandler

logging.basicConfig(level="DEBUG", handlers=[ColoredLogHandler()])
logger = logging.getLogger('main')

server = pyredisrpc.Server('snowdeer-redis-rpc-example')


@server.method
def add(a, b):
    return a + b


try:
    server.start()
except KeyboardInterrupt:
    print("Interrupted")

server.shutdown()

client.py

import logging
import time
import random

import redis_rpc as pyredisrpc
from colored_log_handler import ColoredLogHandler

logging.basicConfig(level="DEBUG", handlers=[ColoredLogHandler()])

client = pyredisrpc.Client('snowdeer-redis-rpc-example', timeout=1)

for i in range(0, 200):
    x = random.randint(1, 10)
    y = random.randint(1, 10)

    ret = client.add(x, y)

    if ret == (x + y):
        logging.info(f'# test({i}) - ({x}, {y} --> {ret})')
    else:
        logging.warning(f'# test({i}) - ({x}, {y} --> {ret})')

    time.sleep(0.5)

Python logging 예제

|

Pythong logging 모듈 사용하기

Python에 기본으로 내장된 logging 모듈에 대한 사용법입니다. 로그 레벨에 따라 debug, info, warning, error, critical의 5가지 등급이 있으며 이 중에서 warningwarn이라는 메소드도 같이 존재합니다. warndeprecated 되었기 때문에, 가급적 warn은 사용하지 말고 warning을 사용하도록 주의합시다.

간단한 예제

import logging


def main():
    logging.debug('debug')
    logging.info('info')
    logging.warning('warning')
    logging.error('error')
    logging.critical('critical')


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG,
                        format='[%(asctime)s] [%(levelname)s] %(message)s (%(filename)s:%(lineno)d)')
    main()


Colored logging 사용하기

여기에 색상을 입히는 코드를 적용해봅니다.

color_palette.py

# Reset
RESET = '\033[0m'

# Regular Colors
BLACK = '\033[30m'
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'

# Bold
BOLD_BLACK = '\033[1;30m'
BOLD_RED = '\033[1;31m'
BOLD_GREEN = '\033[1;32m'
BOLD_YELLOW = '\033[1;33m'
BOLD_BLUE = '\033[1;34m'
BOLD_MAGENTA = '\033[1;35m'
BOLD_CYAN = '\033[1;36m'
BOLD_WHITE = '\033[1;37m'

# Underline
UNDERLINE_BLACK = '\033[4;30m'
UNDERLINE_RED = '\033[4;31m'
UNDERLINE_GREEN = '\033[4;32m'
UNDERLINE_YELLOW = '\033[4;33m'
UNDERLINE_BLUE = '\033[4;34m'
UNDERLINE_MAGENTA = '\033[4;35m'
UNDERLINE_CYAN = '\033[4;36m'
UNDERLINE_WHITE = '\033[4;37m'

# High Intensity
INTENSITY_BLACK = '\033[0;90m'
INTENSITY_RED = '\033[0;91m'
INTENSITY_GREEN = '\033[0;92m'
INTENSITY_YELLOW = '\033[0;93m'
INTENSITY_BLUE = '\033[0;94m'
INTENSITY_MAGENTA = '\033[0;95m'
INTENSITY_CYAN = '\033[0;96m'
INTENSITY_WHITE = '\033[0;97m'

colored_log_handler.py

import logging

from color_palette import RESET, GREEN, WHITE, YELLOW, MAGENTA, RED


class ColoredLogHandler(logging.StreamHandler):
    def __init__(self):
        super().__init__()
        self.setLevel(logging.DEBUG)
        self.setFormatter(self.__LogFormatter())

    class __LogFormatter(logging.Formatter):
        __FORMAT_DEBUG = '[%(asctime)s] [%(levelname)s] %(message)s (%(filename)s:%(lineno)d)'
        __FORMAT_INFO = '[%(asctime)s] [%(levelname)s] %(message)s'
        __FORMAT_WARNING = '[%(asctime)s] [%(levelname)s] %(message)s'
        __FORMAT_ERROR = '[%(asctime)s] [%(levelname)s] %(message)s'
        __FORMAT_CRITICAL = '[%(asctime)s] [%(levelname)s] %(message)s (%(filename)s:%(lineno)d)'

        FORMATS = {
            logging.DEBUG: GREEN + __FORMAT_DEBUG + RESET,
            logging.INFO: WHITE + __FORMAT_INFO + RESET,
            logging.WARNING: YELLOW + __FORMAT_WARNING + RESET,
            logging.ERROR: MAGENTA + __FORMAT_ERROR + RESET,
            logging.CRITICAL: RED + __FORMAT_CRITICAL + RESET
        }

        def format(self, record):
            log_fmt = self.FORMATS.get(record.levelno)
            formatter = logging.Formatter(log_fmt)
            return formatter.format(record)

main.py

import logging
from colored_log_handler import ColoredLogHandler


def main():
    logging.debug('debug')
    logging.info('info')
    logging.warning('warning')
    logging.error('error')
    logging.critical('critical')


if __name__ == '__main__':
    logging.basicConfig(level="DEBUG", handlers=[ColoredLogHandler()])

    main()

ZSH Plug-in 소개

|

ZSH Plug-in 소개

zsh 셀에서 키워드를 입력한 다음 화살표 위 방향 키로 입력된 키워드가 포함된 과거 실행 명령어를 보여주는 유용한 플러그인입니다.

 git clone https://github.com/zsh-users/zsh-history-substring-search ${ZSH_CUSTOM:-~/.oh-my-zsh/custom}/plugins/zsh-history-substring-search

syntax-highlighting

git clone https://github.com/zsh-users/zsh-syntax-highlighting.git ${ZSH_CUSTOM:-~/.oh-my-zsh/custom}/plugins/zsh-syntax-highlighting

~/.zshrc 설정

plugins=(
  git
  alias-tips
  zsh-autosuggestions
  zsh-syntax-highlighting
)

Flutter Stream 예제

|

simple_timer.dart

import 'dart:async';

class SampleTimer {
  int seconds = 0;
  bool isRunning = false;

  late Timer timer;

  void start(int seconds) {
    this.seconds = seconds;
    isRunning = true;
  }

  void stop() {
    isRunning = false;
  }

  Stream<int> stream() async* {
    yield* Stream.periodic(const Duration(seconds: 1), (int a) {
      if (isRunning) {
        seconds = seconds - 1;
        if (seconds <= 0) {
          isRunning = false;
        }
      }

      return seconds;
    });
  }
}

timer_page.dart

import 'package:flutter/material.dart';
import 'package:stream_sample/timer/sample_timer.dart';

class TimerPage extends StatelessWidget {
  TimerPage({Key? key}) : super(key: key);

  final timer = SampleTimer();

  @override
  Widget build(BuildContext context) {
    return Column(
      children: [
        Row(
          children: [
            const Padding(padding: EdgeInsets.all(8)),
            Expanded(
              child: MaterialButton(
                child: const Text('Start'),
                color: Colors.amber,
                onPressed: () => timer.start(100),
              ),
            ),
            const Padding(padding: EdgeInsets.all(8)),
            Expanded(
              child: MaterialButton(
                child: const Text('Stop'),
                color: Colors.amber,
                onPressed: () => timer.stop(),
              ),
            ),
            const Padding(padding: EdgeInsets.all(8)),
          ],
        ),
        StreamBuilder(
          stream: timer.stream(),
          builder: (context, snapshot) {
            return Text(
              snapshot.data.toString(),
              style: const TextStyle(
                fontSize: 30,
              ),
            );
          },
        )
      ],
    );
  }
}

main.dart

import 'package:flutter/material.dart';
import 'package:stream_sample/timer_page.dart';

void main() {
  runApp(const MyApp());
}

class MyApp extends StatelessWidget {
  const MyApp({Key? key}) : super(key: key);

  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      debugShowCheckedModeBanner: false,
      home: Scaffold(
        appBar: AppBar(
          title: const Text('snowdeer\'s timer sample'),
        ),
        body: TimerPage(),
      ),
    );
  }
}