Python pybluez

socket

ํŒŒ์ด์ฌ์— ๊ธฐ๋ณธ ๋‚ด์žฅ๋˜์–ด์žˆ๋Š” socket ๋ชจ๋“ˆ์„ ํ†ตํ•ด ๊ฐ„๋‹จํ•œ ๋ธ”๋ฃจํˆฌ์Šค ํ†ต์‹ ์„ ๊ตฌ์ถ• ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋‹จ ์ฐธ๊ณ ํ•  ๋ฌธ์„œ๊ฐ€ ๊ฑฐ์˜ ์—†๊ธฐ ๋•Œ๋ฌธ์— ๋‹จ์ˆœ ํ†ต์‹  ๊ธฐ๋Šฅ ์™ธ์—๋Š” ํ”„๋กœ๊ทธ๋žจ ๊ฐœ๋ฐœ์ด ์–ด๋ ค์šธ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

pybluez ์„ค์น˜

sudo apt install -y bluez bluetooth libbluetooth-dev python3-pip \
&& python3 -m pip install pybluez

BLE ์ถ”๊ฐ€ ์„ค์น˜

sudo apt install -y pkg-config libboost-python-dev \
libboost-thread-dev libglib2.0-dev python3-dev \
&& python3 -m pip install pybluez\[ble\]

bluetooth.service ๋ณ€๊ฒฝ

์‹คํ–‰ ์˜ต์…˜์œผ๋กœ -C(--compat)๋ฅผ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.

sudo vim /lib/systemd/system/bluetooth.service
...
ExecStart=/usr/lib/bluetooth/bluetoothd -C
...

SDP: Service Discovery Protocol

echo -e '\nsudo chmod 777 /var/run/sdp' >> ~/.bashrc
sudo reboot

๋ธ”๋ฃจํˆฌ์Šค ์ปจํŠธ๋กค๋Ÿฌ

์•„๋ž˜ ๋ช…๋ น์„ ํ†ตํ•ด ๋ธ”๋ฃจํˆฌ์Šค ์žฅ์น˜๋ช…๊ณผ MAC ์ฃผ์†Œ๋ฅผ ํ™•์ธํ•ฉ๋‹ˆ๋‹ค.

hcitool dev
Devices:
hci0 00:1A:7D:DA:71:13

์•„๋ž˜ ๋ช…๋ น์„ ํ†ตํ•ด ๋ธ”๋ฃจํˆฌ์Šค ์žฅ์น˜์ธ hci0์˜ ์Šค์บ”์„ ํ—ˆ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

sudo hciconfig hci0 piscan

bluetoothctl

bluetoothctl ๋ช…๋ น์–ด๋ฅผ ํ†ตํ•ด ๋ธ”๋ฃจํˆฌ์Šค ์žฅ์น˜๋ฅผ ์ œ์–ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

bluetoothctl
[NEW] Controller 18:1D:EA:36:DA:89 hhk7734 [default]
Agent registered
[bluetooth]# help
Menu main:
Available commands:
-------------------
advertise Advertise Options Submenu
scan Scan Options Submenu
gatt Generic Attribute Submenu
list List available controllers
show [ctrl] Controller information
select <ctrl> Select default controller
devices List available devices
paired-devices List paired devices
system-alias <name> Set controller alias
reset-alias Reset controller alias
power <on/off> Set controller power
pairable <on/off> Set controller pairable mode
discoverable <on/off> Set controller discoverable mode
agent <on/off/capability> Enable/disable agent with given capability
default-agent Set agent as the default one
advertise <on/off/type> Enable/disable advertising with given type
set-alias <alias> Set device alias
scan <on/off> Scan for devices
info [dev] Device information
pair [dev] Pair with device
trust [dev] Trust device
untrust [dev] Untrust device
block [dev] Block device
unblock [dev] Unblock device
remove <dev> Remove device
connect <dev> Connect device
disconnect [dev] Disconnect device
menu <name> Select submenu
version Display version
quit Quit program
exit Quit program
help Display help about this program
[bluetooth]#
info

์ •๋ณด๊ฐ€ ์•ˆ๋‚˜์˜ค๋ฉด sudo bluetoothctl ๋ช…๋ น์–ด๋ฅผ ํ†ตํ•ด ๋ธ”๋ฃจํˆฌ์Šค ์žฅ์น˜๋ฅผ ์ œ์–ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Examples

server - client

server

import bluetooth
import signal
import sys
HOST = "" # '๋ธ”๋ฃจํˆฌ์Šค ์ปจํŠธ๋กค๋Ÿฌ ๋งฅ ์ฃผ์†Œ'๋ฅผ ์ง์ ‘ ์ž…๋ ฅํ•ด๋„ ๋จ
PORT = bluetooth.PORT_ANY
UUID = "94f39d29-7d6d-437d-973b-fba39e49d4ee"
def signal_handler(sig, frame):
try:
connected_socket.close()
except:
pass
server_socket.close()
sys.exit()
signal.signal(signal.SIGINT, signal_handler)
# ๋ธ”๋ฃจํˆฌ์Šค ์„œ๋ฒ„ ์†Œ์ผ“ ์ƒ์„ฑ
server_socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
server_socket.bind((HOST, PORT))
server_socket.listen(1)
port = server_socket.getsockname()[1]
print("ํฌํŠธ :", port)
# ๋ธ”๋ฃจํˆฌ์Šค ์„œ๋น„์Šค advertise
bluetooth.advertise_service(
server_socket,
name="server",
service_id=UUID,
service_classes=[UUID, bluetooth.SERIAL_PORT_CLASS],
profiles=[bluetooth.SERIAL_PORT_PROFILE],
)
# ํด๋ผ์ด์–ธํŠธ ์ ‘์† ๋Œ€๊ธฐ
connected_socket, client_address = server_socket.accept()
try:
while True:
data = connected_socket.recv(1024)
print("client : ", data)
connected_socket.send(data)
except:
pass
connected_socket.close()
server_socket.close()

client

import bluetooth
import signal
import sys
# ์„œ๋ฒ„ ์ •๋ณด
HOST = "" # ๋ธ”๋ฃจํˆฌ์Šค ์ปจํŠธ๋กค๋Ÿฌ ๋งฅ ์ฃผ์†Œ
UUID = "94f39d29-7d6d-437d-973b-fba39e49d4ee"
def signal_handler(sig, frame):
client_socket.close()
sys.exit()
signal.signal(signal.SIGINT, signal_handler)
# port๋Š” ๋ชจ๋ฅด๊ณ  name์ด๋‚˜ uuid๋ฅผ ์•„๋Š” ๊ฒฝ์šฐ
# address ๋ฅผ ์„ค์ •ํ•˜๊ณ  ๋‚˜๋จธ์ง€๋ฅผ None์œผ๋กœ ํ•˜๋Š” ๊ฒฝ์šฐ ๊ฒฐ๊ณผ๊ฐ€ ์—ฌ๋Ÿฌ๊ฐœ ๋‚˜์˜ฌ ์ˆ˜ ์žˆ์Œ
# name=None์€ None์ธ ๊ฒฐ๊ณผ๋ฅผ ์ฐพ๋Š” ๊ฒƒ์ด ์•„๋‹˜. None์ด ์•„๋‹ˆ๋”๋ผ๋„ ๋‚˜๋จธ์ง€ ์ •๋ณด๋งŒ ์ผ์น˜ํ•˜๋ฉด ๊ฒฐ๊ณผ์— ํฌํ•จ ๋จ
# ๊ฒฐ๊ณผ๋Š”, 'host', 'name', 'description', 'provider',
# 'protocol', 'service-classes', 'profiles', 'service_id'๋ฅผ ํ‚ค๋กœํ•˜๋Š” ๋”•์…”๋„ˆ๋ฆฌ
# address์™€ ๋‹ค๋ฅธ ํ•˜๋‚˜์˜ ์ •๋ณด๋งŒ ์•Œ๋ฉด ๊ฑฐ์˜ ํ•˜๋‚˜๋กœ ํŠน์ •์ง€์–ด์ง
services = bluetooth.find_service(name=None, uuid=UUID, address=HOST)
if len(services) == 0:
print("์žฅ์น˜๋ฅผ ์ฐพ์ง€ ๋ชปํ–ˆ์Šต๋‹ˆ๋‹ค.")
sys.exit()
service = services[0]
port = service["port"]
print("ํฌํŠธ :", port)
# ๋ธ”๋ฃจํˆฌ์Šค ํด๋ผ์ด์–ธํŠธ ์†Œ์ผ“ ์ƒ์„ฑ
client_socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
# ์„œ๋ฒ„ ์ ‘์†
client_socket.connect((HOST, port))
try:
while True:
data = input("client : ")
if data:
client_socket.send(data.encode())
data = client_socket.recv(1024)
print("server :", data)
except:
pass
client_socket.close()

์†ก์ˆ˜์‹  ์“ฐ๋ ˆ๋“œ

import bluetooth
import threading
import queue
class Bluetooth_socket(threading.Thread):
def __init__(self):
super().__init__()
self._host = ""
self._port = bluetooth.PORT_ANY
self._uuid = "00000001-0000-1000-8000-00805F9B34FB"
# ์ˆ˜์‹  ๋ฐ์ดํ„ฐ ์ €์žฅ์šฉ ํ
self._recv_queue = queue.Queue()
self.daemon = True
# ๋ธ”๋ฃจํˆฌ์Šค ์„œ๋ฒ„ ์†Œ์ผ“ ์ƒ์„ฑ
self._socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
self._socket.bind((self._host, self._port))
self._socket.listen(1)
bluetooth.advertise_service(
self._socket,
name="bluetooth_server",
service_id=self._uuid,
service_classes=[self._uuid, bluetooth.SERIAL_PORT_CLASS],
profiles=[bluetooth.SERIAL_PORT_PROFILE],
)
self._connection_start_callback = None
self._connection_stop_callback = None
self._is_connected = False
def run(self):
while True:
# ๋ธ”๋ฃจํˆฌ์Šค ์—ฐ๊ฒฐ ๋Œ€๊ธฐ
self._connected_socket, client_address = self._socket.accept()
self._is_connected = True
self._start_callback()
try:
while True:
data = self._connected_socket.recv(1024)
self._recv_queue.put(data)
except:
pass
self._stop_callback()
self._is_connected = False
self._connected_socket.close()
def close(self):
self._connected_socket.close()
self._socket.close()
# ์ˆ˜์‹  ๋ฐ์ดํ„ฐ
def recv(self):
try:
data = self._recv_queue.get(block=False)
except queue.Empty:
return None
return data
# ์†ก์‹  ๋ฐ์ดํ„ฐ
def send(self, data):
if self._is_connected:
self._connected_socket.send(data)
# ์—ฐ๊ฒฐ ์‹œ์ž‘ ์‹œ ํ˜ธ์ถœ๋˜๋Š” ์ฝœ๋ฐฑ ์„ค์ •
def set_connection_start_callback(self, callback=None, args=()):
self._connection_start_callback = callback
self._connection_start_callback_args = args
def _start_callback(self):
if self._connection_start_callback is not None:
self._connection_start_callback(
*self._connection_start_callback_args
)
# ์—ฐ๊ฒฐ ์ข…๋ฃŒ ์‹œ ํ˜ธ์ถœ๋˜๋Š” ์ฝœ๋ฐฑ ์„ค์ •
def set_connection_stop_callback(self, callback=None, args=()):
self._connection_stop_callback = callback
self._connection_stop_callback_args = args
def _stop_callback(self):
if self._connection_stop_callback is not None:
self._connection_stop_callback(*self._connection_stop_callback_args)
def is_connected(self):
return self._is_connected

Reference

Last updated on