라즈베리파이에서 오디오 녹음하기

|

라즈베리파이에서 오디오 파일로 녹음하기

pyalsaaudio 모듈 설치

라즈베리파이(Raspberry PI)에서 Python을 이용해서 오디오 녹음을 하는 예제입니다.

라즈베리파이도 기본적으로 Linux 기반이다보니 ‘ALSA’ 오디오 라이브러리를 사용합니다. Python에서도 마찬가지로 pyalsaaudio라는 모듈을 설치해야 오디오 장비에 접근할 수 있습니다.

터미널에서 다음 명령어로 pyalsaaudio 모듈을 설치합니다.

sudo pip3 install pyalsaaudio

만약 pyalsaaudio 설치 도중에

alsaaudio.c:28:28: fatal error: alsa/asoundlib.h: No such file or directory
     #include <alsa/asoundlib.h>
                                ^
    compilation terminated.
    error: command 'arm-linux-gnueabihf-gcc' failed with exit status 1

위와 같은 오류 메세지가 발생했을 경우에는 다음 명령어를 이용해서 libasound2-dev를 먼저 설치해줘야 합니다.

sudo apt-get install libasound2-dev


오디오 녹음하는 Python 예제 코드

from __future__ import print_function

import sys
import time
import getopt
import alsaaudio


def usage():
    print('usage: recordtest.py [-d <device>] <file>', file=sys.stderr)
    sys.exit(2)


if __name__ == '__main__':

    device = 'default'

    opts, args = getopt.getopt(sys.argv[1:], 'd:')
    for o, a in opts:
        if o == '-d':
            device = a

    if not args:
        usage()

    f = open(args[0], 'wb')

    inp = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NONBLOCK, device=device)

    inp.setchannels(1)
    inp.setrate(44100)
    inp.setformat(alsaaudio.PCM_FORMAT_S16_LE)
    inp.setperiodsize(160)

    loops = 1000000
    while loops > 0:
        loops -= 1
        l, data = inp.read()

        if l:
            f.write(data)
            time.sleep(.001)


aplay로 녹음된 파일 확인

aplay로 정상적으로 녹음이 되었는지 확인합니다.

aplay -t raw -c 1 -f S16_LE -r 44100 test.raw

Condition Variable 활용한 Producer/Consumer 예제

|

Condition Variable

조건 변수(Condition Variable)는 내부에 Thread 대기 큐를 갖고 있습니다. wait() 메소드가 호출된 Thread는 이 대기 큐에 들어가게 되고 Sleep 상태가 되며, notify()notifyAll() 메소드에 의해 깨어나게 됩니다.

조건 변수를 활용한 생산자/소비자 패턴(Producer/Consumer Pattern) 예제는 다음과 같습니다.


Producer/Consumber 패턴 예제

Producer는 물건(item)을 생산하여 queue에 차곡차곡 쌓아가며, Consumer는 queue에 쌓여있는 물건을 하나씩 가져갑니다. 단, queue에 쌓인 물건이 하나도 없을 경우에는 대기(wait)를 합니다.

import threading
import time

CONSUMER_COUNT = 10
PRODUCER_COUNT = CONSUMER_COUNT // 2

queue = []
cv = threading.Condition()

item_id = 0

class Consumer(threading.Thread):
    def __init__(self, id):
        threading.Thread.__init__(self)
        self.id = id

    def run(self):
        for i in range(5):
            cv.acquire()
            while len(queue) < 1:
                print('consumer({}) waiting...'.format(self.id))
                cv.wait()
            print('consumer({}) -> item({})'.format(self.id, queue.pop(0)))
            cv.release()
            time.sleep(0.5)


class Producer(threading.Thread):
    def run(self):
        global item_id
        for i in range(10):
            cv.acquire()
            item_id += 1
            queue.append(item_id)
            cv.notify()
            cv.release()
            time.sleep(0.7)


threads = []

for i in range(CONSUMER_COUNT):
    threads.append(Consumer(i))

for i in range(PRODUCER_COUNT):
    threads.append(Producer())

for th in threads:
    th.start()

for th in threads:
    th.join()

print('<End>')


실행 결과

consumer(0) waiting...
consumer(1) waiting...
consumer(2) waiting...
consumer(3) waiting...
consumer(4) waiting...
consumer(5) waiting...
consumer(6) waiting...
consumer(7) waiting...
consumer(8) waiting...
consumer(9) waiting...
consumer(0) -> item(1)
consumer(1) -> item(2)
consumer(2) -> item(3)
consumer(3) -> item(4)
consumer(4) -> item(5)
consumer(4) waiting...
consumer(2) waiting...
consumer(3) waiting...
consumer(0) waiting...
consumer(1) waiting...
consumer(6) -> item(6)
consumer(7) -> item(7)
consumer(8) -> item(8)
consumer(5) -> item(9)
consumer(9) -> item(10)
consumer(9) waiting...
consumer(8) waiting...
consumer(5) waiting...
consumer(6) waiting...
consumer(7) waiting...
consumer(4) -> item(11)
consumer(2) -> item(12)
consumer(0) -> item(13)
consumer(3) -> item(14)
consumer(1) -> item(15)
consumer(1) waiting...
consumer(4) waiting...
consumer(0) waiting...
consumer(2) waiting...
consumer(3) waiting...
consumer(9) -> item(16)
consumer(8) -> item(17)
consumer(5) -> item(18)
consumer(6) -> item(19)
consumer(7) -> item(20)
consumer(9) waiting...
consumer(6) waiting...
consumer(7) waiting...
consumer(8) waiting...
consumer(5) waiting...
consumer(1) -> item(21)
consumer(4) -> item(22)
consumer(0) -> item(23)
consumer(3) -> item(24)
consumer(2) -> item(25)
consumer(1) waiting...
consumer(2) waiting...
consumer(3) waiting...
consumer(0) waiting...
consumer(4) waiting...
consumer(9) -> item(26)
consumer(6) -> item(27)
consumer(7) -> item(28)
consumer(8) -> item(29)
consumer(5) -> item(30)
consumer(9) waiting...
consumer(5) waiting...
consumer(8) waiting...
consumer(6) waiting...
consumer(7) waiting...
consumer(1) -> item(31)
consumer(3) -> item(32)
consumer(0) -> item(33)
consumer(2) -> item(34)
consumer(4) -> item(35)
consumer(1) waiting...
consumer(2) waiting...
consumer(3) waiting...
consumer(4) waiting...
consumer(0) waiting...
consumer(9) -> item(36)
consumer(5) -> item(37)
consumer(8) -> item(38)
consumer(6) -> item(39)
consumer(7) -> item(40)
consumer(9) waiting...
consumer(6) waiting...
consumer(7) waiting...
consumer(8) waiting...
consumer(5) waiting...
consumer(1) -> item(41)
consumer(2) -> item(42)
consumer(3) -> item(43)
consumer(4) -> item(44)
consumer(0) -> item(45)
consumer(9) -> item(46)
consumer(7) -> item(47)
consumer(6) -> item(48)
consumer(8) -> item(49)
consumer(5) -> item(50)
<End>


실제 Queue를 활용한 Producer/Consumer 패턴 예제

위의 예제에서는 queue라는 이름을 가진 리스트를 사용해서 구현했지만, 파이썬에서는 Multi-Threading 환경에서 사용할 수 있는 queue 모듈을 제공하고 있습니다.

queue 모듈을 사용한 예제는 아래와 같습니다.

import threading
import time
from queue import Queue

CONSUMER_COUNT = 10
PRODUCER_COUNT = CONSUMER_COUNT // 2

que = Queue(100)
item_id = 0

class Consumer(threading.Thread):
    def __init__(self, id):
        threading.Thread.__init__(self)
        self.id = id

    def run(self):
        for i in range(5):
            print('consumer({}) -> item({})'.format(self.id, que.get()))
            time.sleep(0)

class Producer(threading.Thread):
    def run(self):
        global item_id
        for i in range(10):
            item_id += 1
            que.put(item_id)
            time.sleep(0.7)

threads = []

for i in range(CONSUMER_COUNT):
    threads.append(Consumer(i))

for i in range(PRODUCER_COUNT):
    threads.append(Producer())

for th in threads:
    th.start()

for th in threads:
    th.join()

print('<<End>')

Thread Example

|

Python의 Thread

GIL

대부분의 프로그램에서 Thread는 아주 많이 활용되고 있습니다. 하지만, Python의 Thread는 제약이 있습니다. GIL(Global Interpreter Lock)이라고 불리우는 전역 인터프리터 락이 존재합니다. GIL 때문에 여러 개의 코어(Core)를 갖고 있는 CPU에서 프로그램을 실행하더라도 단 하나의 코어만 활용하도록 되어 있습니다. 그 덕분에 Thread를 사용하게 되면 Thread를 사용하지 않았을 때보다 성능이 훨씬 더 저하되는 현상이 발생합니다.

한 동안 GIL을 제거하려는 논의가 있었지만, GIL은 여전히 존재하고 있습니다. GIL을 제거했을 때 얻는 장점과 추가로 생기는 단점의 큰 차이가 없어서 GIL을 유지하는 것으로 결정되었다고 합니다.

GIL은 CPU 기반의 작업에서만 아주 크게 영향을 미칩니다. I/O 작업(대부분의 시스템 콜(System Call)이나 네트워크 작업들)에서는 Thread를 사용할 경우 성능적인 장점이 있습니다. 시스템 콜을 하게 될 경우 그 순간 GIL은 해제되어 다른 작업을 할 수 있게 하고, 시스템 콜이 끝나면 다시 GIL을 획득하도록 되어 있어 Thread로 성능 향상을 가져올 수 있습니다.

그 외 CPU 기반의 다중 작업을 해야 하는 경우는 Thread 보다는 멀티프로세스(Multi Process)를 활용하는 것이 좋습니다.

Thread를 이용하는 방법은 여러가지가 있는데 그 중 threading 모듈을 이용해서 사용하는 것을 추천합니다. 저수준의 _thread 모듈과 고수준의 threading 모듈이 있는데 threading 모듈만 이용해도 대부분의 작업을 처리할 수 있으며 다뤄야 할 부분이 훨씬 적기 때문에 간단하고 안전하게 사용할 수 있기 때문입니다.


특정 함수를 Multi-Threading으로 실행

import threading
import time


def loop(id):
    print('Thread({}) is starting...'.format(id))

    for i in range(10):
        print('Thread({}) - {}'.format(id, i))
        time.sleep(0)

    print('Thread({}) is finished...'.format(id))


threads = []

if __name__ == '__main__':
    for i in range(5):
        th = threading.Thread(target=loop, args=(i,))
        threads.append(th)
        th.start()

    for th in threads:
        th.join()

    print('<End>')


특정 클래스를 Multi-Threading으로 실행

import threading
import time

class MyThread(threading.Thread):
    def run(self):
        print('Thread({}) is starting...'.format(self.getName()))

        for i in range(10):
            print('Thread({}) - {}'.format(self.getName(), i))
            time.sleep(0)

        print('Thread({}) is finished...'.format(self.getName()))

threads = []

if __name__ == '__main__':
    for i in range(5):
        th = MyThread()
        threads.append(th)
        th.start()

    for th in threads:
        th.join()

    print('<End>')


Thread Lock

import threading
import time

count = 0

class MyThread(threading.Thread):
    def run(self):
        global count

        for i in range(10):
            lock.acquire()
            count += 1
            lock.release()

threads = []
lock = threading.Lock()

if __name__ == '__main__':
    for i in range(5):
        th = MyThread()
        threads.append(th)
        th.start()

    for th in threads:
        th.join()

    print('Total Count: {}'.format(count))

만약 acquire() 메소드를 여러 번 호출하려면 Lock 대신 RLock 클래스를 활용하면 됩니다. 물론, 해제할 때는 acquire() 메소드 호출 횟수만큼 release()를 호출해야합니다.

TCP Echo Server/Client

|

Echo Server

Python 3.x 에서 동작하는 TCP 기반의 Echo Server/Client 예제 코드입니다.

import socketserver
import sys

class TcpEchoServerHandler(socketserver.BaseRequestHandler):
    def handle(self):
        print('Client is connected : {0}'.format(self.client_address[0]))
        sock = self.request

        buffer = sock.recv(1024)
        received_message = str(buffer, encoding='utf-8')
        print('Received : {0}'.format(received_message))
        sock.send(buffer)
        sock.close()

if __name__ == '__main__':
    if len(sys.argv) < 2:
        print('{0} [Bind IP]'.format(sys.argv[0]))
        sys.exit()

    bindIP = sys.argv[1]
    bindPort = 10070

    server = socketserver.TCPServer((bindIP, bindPort), TcpEchoServerHandler)

    print("Start Echo-Server")
    server.serve_forever()


Echo Client

import socket
import sys

serverPort = 10070

if __name__ == '__main__':
    if len(sys.argv) < 4:
        print('{0} [BindIP] [Server IP] [Message]'.format(sys.argv[0]))
        sys.exit()

    bindIP = sys.argv[1]
    serverIP = sys.argv[2]
    message = sys.argv[3]

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind((bindIP, 0))

    try:
        sock.connect((serverIP, serverPort))

        buffer = bytes(message, encoding='utf-8')
        sock.send(buffer)
        print('Sended message : {0}'.format(message))

        buffer = sock.recv(1024)
        received_message = str(buffer, encoding='utf-8')
        print('Received message : {0}'.format(received_message))

    finally:
        sock.close()

정규식 사용하기

|

Python에서 re 모듈은 정규 표현식(Regular Expression)에 관련된 라이브러리입니다.

^$ 연산자를 사용해서 각 줄의 시작과 끝을 매칭 가능하며 대소문자를 무시하는 re.I 플래그나 여러 줄로 이루어진 텍스트에 적용할 수 있는 re.M 플래그를 많이 사용합니다.

정규식 문법

기본 연산자

연산자 | 설명 —|— . | 개행 문자를 제외한 모든 문자 a | 문자 a ab | 문자열 ab x|y x나 y \y | 특수문자(^+{}$()[]|-?.* 등)에서 탈출


문자열 클래스

연산자 설명
[a-d] a 부터 d 까지 중 문자 1개
[^a-d] a, b, c, d를 제외한 문자 1개
\d 숫자(digit) 1개
\D 숫자가 아닌 개체 1개
\s 공백 1개
\S 공백이 아닌 개체 1개
\w 알파벳 또는 숫자 1개
\W 알파벳이나 숫자가 아닌 개체 1개


양적 연산자

연산자 설명
x* 0개 이상의 x
x+ 1개 이상의 x
x? 0 또는 1개의 x
x{2} x가 정확히 2개
x{2, 5} x가 2개부터 5개까지


탈출 연산자

연산자 설명
\n 새로운 라인(개행 문자)
\r 캐리지 리턴(현재 줄의 맨 앞)
\t 탭(tab)


위치 지정

연산자 설명
^ 문자열의 처음
\b 단어 경계
\B 비단어 경계
$ 문자열의 끝


그룹

연산자 설명
(x) 캡처링 그룹(capturing group)
(?:x) 비캡처링 그룹(non-capturing group)


정규식 예제

이메일 주소

r"\w[-\w\.]*@\w[-\w]*(\.\w[-\w]*)+"

HTML 태그

r"<TAG\b[^>]*<(.*?)</TAG>"

부동 소수점

r"[-+]?((\d*\.?\d+)|(\d\.))([eE][-+]?\d+)?"