Skip to content
Commits on Source (2)
# [1.0.0-cpp-classes.2](https://gitlab.provvedo.com/provvedo/esp-modbus/compare/v1.0.0-cpp-classes.1...v1.0.0-cpp-classes.2) (2026-01-18)
### Bug Fixes
* update the test tool with textual and a bit more functionality. including storing the test parameters ([7993b04](https://gitlab.provvedo.com/provvedo/esp-modbus/commit/7993b0407b6fa8f06e2ca4cf329e9bbca1bc09f2))
# 1.0.0-cpp-classes.1 (2026-01-15)
......
"""
Shared controller data definitions for ESP-Modbus tools.
This module provides:
- Register address constants matching the C++ ControllerData struct
- Data type conversion utilities
- ControllerData dataclass for parsed data representation
- Mode flag definitions and formatting
"""
import struct
from dataclasses import dataclass, field
from typing import List
# Configuration constants (must match ServoConfig.h)
SERVO_COUNT = 1
SCALES_COUNT = 4
MODBUS_SLAVE_ID = 17
# Register offsets (calculated from ControllerData struct layout)
REG_EXECUTION_INTERVAL = 0
REG_EXECUTION_INTERVAL_PREV = 2
REG_EXECUTION_INTERVAL_CURR = 4
REG_EXECUTION_CYCLES = 6
REG_CPU_FREQ_HZ = 8
# FastData starts at offset 10
REG_FAST_SERVO_CURRENT = 10
REG_FAST_SERVO_DESIRED = 10 + SERVO_COUNT * 2
REG_FAST_SERVO_MODE = REG_FAST_SERVO_DESIRED + SERVO_COUNT * 2
REG_FAST_SCALE_CURRENT = REG_FAST_SERVO_MODE + 2
REG_FAST_CYCLES = REG_FAST_SCALE_CURRENT + SCALES_COUNT * 2
# Control pins after FastData
REG_PINS = REG_FAST_CYCLES + 2 # usrLedPin, enaPin, ttlSelPin, unused (4 bytes)
# ServoAxis[0] starts after pins
REG_SERVO0_BASE = REG_PINS + 2
REG_SERVO0_MAX_SPEED = REG_SERVO0_BASE
REG_SERVO0_CURRENT_SPEED = REG_SERVO0_BASE + 2
REG_SERVO0_JOG_SPEED = REG_SERVO0_BASE + 4
REG_SERVO0_REF_SPEED_FWD = REG_SERVO0_BASE + 6
REG_SERVO0_REF_SPEED_REV = REG_SERVO0_BASE + 8
REG_SERVO0_MAX_ACCEL = REG_SERVO0_BASE + 10
REG_SERVO0_CURRENT_ACCEL = REG_SERVO0_BASE + 12
REG_SERVO0_JERK = REG_SERVO0_BASE + 14
REG_SERVO0_STEPS_TO_GO = REG_SERVO0_BASE + 16
REG_SERVO0_PREVIOUS_STEPS = REG_SERVO0_BASE + 18
REG_SERVO0_CURRENT_STEPS = REG_SERVO0_BASE + 20
REG_SERVO0_DESIRED_STEPS = REG_SERVO0_BASE + 22
REG_SERVO0_REF_DELAY = REG_SERVO0_BASE + 24
REG_SERVO0_REF_DELAY_CTR = REG_SERVO0_BASE + 26
REG_SERVO0_CURRENT_DIR = REG_SERVO0_BASE + 28
REG_SERVO0_PREVIOUS_DIR = REG_SERVO0_BASE + 30
REG_SERVO0_PINS = REG_SERVO0_BASE + 32 # stepPin, dirPin, referencePin, unused
REG_SERVO0_MODE = REG_SERVO0_BASE + 34
REG_SERVO0_SYNC_SCALE_IDX = REG_SERVO0_BASE + 36
REG_SERVO0_SYNC_RATIO_NUM = REG_SERVO0_BASE + 38
REG_SERVO0_SYNC_RATIO_DEN = REG_SERVO0_BASE + 40
REG_SERVO0_SYNC_DELTA_POS = REG_SERVO0_BASE + 42 # DeltaPosError: 5 x int32 = 10 regs
REG_SERVO0_SPEED_DELTA_POS = REG_SERVO0_BASE + 52 # DeltaPosError: 5 x int32 = 10 regs
# Size of one ServoAxis in registers
SERVO_AXIS_SIZE = 62
# After all ServoAxis entries
REG_SCALE_POSITIONS = REG_SERVO0_BASE + SERVO_AXIS_SIZE * SERVO_COUNT
REG_SERVO_CYCLES = REG_SCALE_POSITIONS + SCALES_COUNT * 2
REG_SERVO_CYCLES_CTR = REG_SERVO_CYCLES + 1
REG_TTL_SEL = REG_SERVO_CYCLES_CTR + 1
# Total registers
TOTAL_REGISTERS = REG_TTL_SEL + 2
# GlobalMode flags
class GlobalMode:
ENABLE = 0x0001
MOTION_HOLD = 0x0002
# ServoMode flags
class ServoMode:
JOG = 0x0002
POSITION = 0x0004
SYNC = 0x0008
REF_REQUEST = 0x0010
REF_FORWARD = 0x0020
REF_REVERSE = 0x0040
REF_DONE = 0x0080
REF_ERROR = 0x0100
# Data type conversion utilities
def regs_to_float(regs: List[int], offset: int = 0) -> float:
"""Convert two 16-bit registers to a float (little-endian)."""
packed = struct.pack('<HH', regs[offset], regs[offset + 1])
return struct.unpack('<f', packed)[0]
def regs_to_int32(regs: List[int], offset: int = 0) -> int:
"""Convert two 16-bit registers to a signed int32 (little-endian)."""
packed = struct.pack('<HH', regs[offset], regs[offset + 1])
return struct.unpack('<i', packed)[0]
def regs_to_uint32(regs: List[int], offset: int = 0) -> int:
"""Convert two 16-bit registers to an unsigned uint32 (little-endian)."""
packed = struct.pack('<HH', regs[offset], regs[offset + 1])
return struct.unpack('<I', packed)[0]
def float_to_regs(value: float) -> tuple[int, int]:
"""Convert a float to two 16-bit registers (little-endian)."""
packed = struct.pack('<f', value)
r0 = struct.unpack('<H', packed[0:2])[0]
r1 = struct.unpack('<H', packed[2:4])[0]
return r0, r1
def int32_to_regs(value: int) -> tuple[int, int]:
"""Convert a signed int32 to two 16-bit registers (little-endian)."""
packed = struct.pack('<i', value)
r0 = struct.unpack('<H', packed[0:2])[0]
r1 = struct.unpack('<H', packed[2:4])[0]
return r0, r1
def uint32_to_regs(value: int) -> tuple[int, int]:
"""Convert an unsigned uint32 to two 16-bit registers (little-endian)."""
packed = struct.pack('<I', value)
r0 = struct.unpack('<H', packed[0:2])[0]
r1 = struct.unpack('<H', packed[2:4])[0]
return r0, r1
def bytes_to_regs(data: bytes) -> List[int]:
"""Convert raw bytes to list of 16-bit registers (little-endian)."""
regs = []
for i in range(0, len(data), 2):
if i + 1 < len(data):
regs.append(struct.unpack('<H', data[i:i+2])[0])
return regs
@dataclass
class ControllerData:
"""Parsed representation of the Modbus register data."""
# Timing
execution_interval: int = 0
execution_interval_prev: int = 0
execution_interval_curr: int = 0
execution_cycles: int = 0
cpu_freq_hz: int = 0
# FastData
fast_servo_current: List[int] = field(default_factory=lambda: [0] * SERVO_COUNT)
fast_servo_desired: List[int] = field(default_factory=lambda: [0] * SERVO_COUNT)
fast_servo_mode: int = 0
fast_scale_current: List[int] = field(default_factory=lambda: [0] * SCALES_COUNT)
fast_cycles: int = 0
# Pins
usr_led_pin: int = 0
ena_pin: int = 0
ttl_sel_pin: int = 0
# ServoAxis[0]
servo_max_speed: float = 0.0
servo_current_speed: float = 0.0
servo_jog_speed: float = 0.0
servo_ref_speed_fwd: float = 0.0
servo_ref_speed_rev: float = 0.0
servo_max_accel: float = 0.0
servo_current_accel: float = 0.0
servo_jerk: float = 0.0
servo_steps_to_go: int = 0
servo_previous_steps: int = 0
servo_current_steps: int = 0
servo_desired_steps: int = 0
servo_ref_delay: int = 0
servo_ref_delay_ctr: int = 0
servo_current_dir: int = 0
servo_previous_dir: int = 0
servo_step_pin: int = 0
servo_dir_pin: int = 0
servo_ref_pin: int = 0
servo_mode: int = 0
servo_sync_scale_idx: int = 0
servo_sync_ratio_num: int = 0
servo_sync_ratio_den: int = 0
# Additional
servo_cycles: int = 0
servo_cycles_ctr: int = 0
ttl_sel: int = 0
@classmethod
def from_registers(cls, regs: List[int]) -> "ControllerData":
"""Parse raw Modbus registers into ControllerData structure."""
data = cls()
data.execution_interval = regs_to_uint32(regs, REG_EXECUTION_INTERVAL)
data.execution_interval_prev = regs_to_uint32(regs, REG_EXECUTION_INTERVAL_PREV)
data.execution_interval_curr = regs_to_uint32(regs, REG_EXECUTION_INTERVAL_CURR)
data.execution_cycles = regs_to_uint32(regs, REG_EXECUTION_CYCLES)
data.cpu_freq_hz = regs_to_uint32(regs, REG_CPU_FREQ_HZ)
# FastData
for i in range(SERVO_COUNT):
offset = REG_FAST_SERVO_CURRENT + i * 2
data.fast_servo_current[i] = regs_to_uint32(regs, offset)
for i in range(SERVO_COUNT):
offset = REG_FAST_SERVO_DESIRED + i * 2
data.fast_servo_desired[i] = regs_to_uint32(regs, offset)
data.fast_servo_mode = regs_to_uint32(regs, REG_FAST_SERVO_MODE)
for i in range(SCALES_COUNT):
offset = REG_FAST_SCALE_CURRENT + i * 2
data.fast_scale_current[i] = regs_to_int32(regs, offset)
data.fast_cycles = regs_to_uint32(regs, REG_FAST_CYCLES)
# Pins (4 bytes packed into 2 registers)
pins_data = struct.pack('<HH', regs[REG_PINS], regs[REG_PINS + 1])
data.usr_led_pin = pins_data[0]
data.ena_pin = pins_data[1]
data.ttl_sel_pin = pins_data[2]
# ServoAxis[0]
data.servo_max_speed = regs_to_float(regs, REG_SERVO0_MAX_SPEED)
data.servo_current_speed = regs_to_float(regs, REG_SERVO0_CURRENT_SPEED)
data.servo_jog_speed = regs_to_float(regs, REG_SERVO0_JOG_SPEED)
data.servo_ref_speed_fwd = regs_to_float(regs, REG_SERVO0_REF_SPEED_FWD)
data.servo_ref_speed_rev = regs_to_float(regs, REG_SERVO0_REF_SPEED_REV)
data.servo_max_accel = regs_to_float(regs, REG_SERVO0_MAX_ACCEL)
data.servo_current_accel = regs_to_float(regs, REG_SERVO0_CURRENT_ACCEL)
data.servo_jerk = regs_to_float(regs, REG_SERVO0_JERK)
data.servo_steps_to_go = regs_to_int32(regs, REG_SERVO0_STEPS_TO_GO)
data.servo_previous_steps = regs_to_uint32(regs, REG_SERVO0_PREVIOUS_STEPS)
data.servo_current_steps = regs_to_uint32(regs, REG_SERVO0_CURRENT_STEPS)
data.servo_desired_steps = regs_to_uint32(regs, REG_SERVO0_DESIRED_STEPS)
data.servo_ref_delay = regs_to_uint32(regs, REG_SERVO0_REF_DELAY)
data.servo_ref_delay_ctr = regs_to_uint32(regs, REG_SERVO0_REF_DELAY_CTR)
data.servo_current_dir = regs_to_int32(regs, REG_SERVO0_CURRENT_DIR)
data.servo_previous_dir = regs_to_int32(regs, REG_SERVO0_PREVIOUS_DIR)
# Servo pins (4 bytes packed)
servo_pins = struct.pack('<HH', regs[REG_SERVO0_PINS], regs[REG_SERVO0_PINS + 1])
data.servo_step_pin = servo_pins[0]
data.servo_dir_pin = servo_pins[1]
data.servo_ref_pin = servo_pins[2]
data.servo_mode = regs_to_uint32(regs, REG_SERVO0_MODE)
data.servo_sync_scale_idx = regs_to_uint32(regs, REG_SERVO0_SYNC_SCALE_IDX)
data.servo_sync_ratio_num = regs_to_int32(regs, REG_SERVO0_SYNC_RATIO_NUM)
data.servo_sync_ratio_den = regs_to_int32(regs, REG_SERVO0_SYNC_RATIO_DEN)
# Additional fields after ServoAxis
data.servo_cycles = regs[REG_SERVO_CYCLES]
data.servo_cycles_ctr = regs[REG_SERVO_CYCLES_CTR]
data.ttl_sel = regs_to_uint32(regs, REG_TTL_SEL)
return data
@property
def loop_time_us(self) -> float:
"""Calculate loop time in microseconds."""
if self.cpu_freq_hz > 0:
return self.execution_interval / (self.cpu_freq_hz / 1e6)
return 0.0
@property
def loop_frequency_hz(self) -> float:
"""Calculate loop frequency in Hz."""
loop_time = self.loop_time_us
if loop_time > 0:
return 1e6 / loop_time
return 0.0
def format_mode_flags(global_mode: int, servo_mode: int) -> str:
"""Format mode flags as readable string."""
gm = []
if global_mode & GlobalMode.ENABLE:
gm.append("EN")
if global_mode & GlobalMode.MOTION_HOLD:
gm.append("HOLD")
sm = []
if servo_mode & ServoMode.JOG:
sm.append("JOG")
if servo_mode & ServoMode.POSITION:
sm.append("POS")
if servo_mode & ServoMode.SYNC:
sm.append("SYNC")
if servo_mode & ServoMode.REF_REQUEST:
sm.append("REF_REQ")
if servo_mode & ServoMode.REF_FORWARD:
sm.append("REF_FWD")
if servo_mode & ServoMode.REF_REVERSE:
sm.append("REF_REV")
if servo_mode & ServoMode.REF_DONE:
sm.append("REF_OK")
if servo_mode & ServoMode.REF_ERROR:
sm.append("REF_ERR")
return f"G:[{','.join(gm) or '-'}] S:[{','.join(sm) or '-'}]"
def dump_controller_data(data: ControllerData) -> str:
"""Format ControllerData as a detailed dump string."""
lines = []
lines.append("=" * 70)
lines.append("CONTROLLER DATA DUMP")
lines.append("=" * 70)
lines.append("\n--- Timing ---")
lines.append(f" Execution Interval: {data.execution_interval:,} cycles")
lines.append(f" Execution Cycles: {data.execution_cycles:,} cycles")
lines.append(f" CPU Frequency: {data.cpu_freq_hz:,} Hz ({data.cpu_freq_hz/1e6:.0f} MHz)")
lines.append(f" Loop Time: {data.loop_time_us:.2f} us ({data.loop_frequency_hz:.0f} Hz)")
lines.append("\n--- FastData ---")
lines.append(f" Servo Current: {data.fast_servo_current}")
lines.append(f" Servo Desired: {data.fast_servo_desired}")
lines.append(f" Global Mode: 0x{data.fast_servo_mode:04X} ({format_mode_flags(data.fast_servo_mode, 0)})")
lines.append(f" Scale Current: {data.fast_scale_current}")
lines.append(f" Fast Cycles: {data.fast_cycles:,}")
lines.append("\n--- Pins ---")
lines.append(f" LED Pin: {data.usr_led_pin}")
lines.append(f" Enable Pin: {data.ena_pin}")
lines.append(f" TTL Select Pin: {data.ttl_sel_pin}")
lines.append("\n--- Servo Axis 0 ---")
lines.append(f" Max Speed: {data.servo_max_speed:.1f} steps/s")
lines.append(f" Current Speed: {data.servo_current_speed:.1f} steps/s")
lines.append(f" Jog Speed: {data.servo_jog_speed:.1f} steps/s")
lines.append(f" Ref Speed Fwd: {data.servo_ref_speed_fwd:.1f} steps/s")
lines.append(f" Ref Speed Rev: {data.servo_ref_speed_rev:.1f} steps/s")
lines.append(f" Max Acceleration: {data.servo_max_accel:.1f} steps/s^2")
lines.append(f" Current Acceleration: {data.servo_current_accel:.1f} steps/s^2")
lines.append(f" Jerk: {data.servo_jerk:.1f} steps/s^3")
lines.append(f" Steps To Go: {data.servo_steps_to_go:,}")
lines.append(f" Previous Steps: {data.servo_previous_steps:,}")
lines.append(f" Current Steps: {data.servo_current_steps:,}")
lines.append(f" Desired Steps: {data.servo_desired_steps:,}")
lines.append(f" Reference Delay: {data.servo_ref_delay}")
lines.append(f" Reference Delay Counter: {data.servo_ref_delay_ctr}")
lines.append(f" Current Direction: {data.servo_current_dir}")
lines.append(f" Previous Direction: {data.servo_previous_dir}")
lines.append(f" Step Pin: {data.servo_step_pin}")
lines.append(f" Dir Pin: {data.servo_dir_pin}")
lines.append(f" Reference Pin: {data.servo_ref_pin}")
lines.append(f" Servo Mode: 0x{data.servo_mode:04X} ({format_mode_flags(0, data.servo_mode)})")
lines.append(f" Sync Scale Index: {data.servo_sync_scale_idx}")
lines.append(f" Sync Ratio: {data.servo_sync_ratio_num}/{data.servo_sync_ratio_den}")
lines.append("\n--- Additional ---")
lines.append(f" Servo Cycles: {data.servo_cycles}")
lines.append(f" Servo Cycles Counter: {data.servo_cycles_ctr}")
lines.append(f" TTL Select: {data.ttl_sel}")
lines.append("=" * 70)
return "\n".join(lines)
\ No newline at end of file
#!/usr/bin/env python3
"""
ESP-Modbus Test Tools - Entry Point
A modern text-based user interface for running various tests against the ESP32 controller.
"""
from ui import ESPModbusApp
def main():
"""Main entry point."""
app = ESPModbusApp()
app.run()
if __name__ == "__main__":
main()
\ No newline at end of file
"""
Async Modbus client for ESP-Modbus tools.
Provides a unified async interface for Modbus RTU communication over:
- Serial/UART
- Bluetooth RFCOMM
- TCP
Uses pymodbus for the Modbus protocol implementation.
"""
import asyncio
import socket
import struct
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, List, Tuple
from pymodbus.client import AsyncModbusSerialClient, AsyncModbusTcpClient
from pymodbus.framer import FramerType
from controller import ControllerData, TOTAL_REGISTERS, MODBUS_SLAVE_ID
# Bluetooth constants (Linux BlueZ)
BTPROTO_RFCOMM = 3
@dataclass
class ModbusResult:
"""Result of a Modbus operation."""
success: bool
data: Optional[List[int]] = None
latency_ms: float = 0.0
error: Optional[str] = None
class AsyncModbusClient(ABC):
"""Abstract base class for async Modbus clients."""
def __init__(self, slave_id: int = MODBUS_SLAVE_ID):
self.slave_id = slave_id
@abstractmethod
async def connect(self) -> bool:
"""Connect to the Modbus device."""
pass
@abstractmethod
async def close(self) -> None:
"""Close the connection."""
pass
@abstractmethod
async def read_registers(self, address: int, count: int) -> ModbusResult:
"""Read holding registers."""
pass
@abstractmethod
async def write_register(self, address: int, value: int) -> ModbusResult:
"""Write a single register."""
pass
@abstractmethod
async def write_registers(self, address: int, values: List[int]) -> ModbusResult:
"""Write multiple registers."""
pass
@property
@abstractmethod
def is_connected(self) -> bool:
"""Check if connected."""
pass
@property
@abstractmethod
def name(self) -> str:
"""Get a descriptive name for the connection."""
pass
async def read_all_data(self) -> Tuple[Optional[ControllerData], float]:
"""Read and parse entire ControllerData structure."""
result = await self.read_registers(0, TOTAL_REGISTERS)
if not result.success or result.data is None or len(result.data) < TOTAL_REGISTERS:
return None, result.latency_ms
return ControllerData.from_registers(result.data), result.latency_ms
class AsyncSerialClient(AsyncModbusClient):
"""Async Modbus RTU client over serial/UART."""
def __init__(
self,
port: str,
baudrate: int = 115200,
slave_id: int = MODBUS_SLAVE_ID,
timeout: float = 1.0
):
super().__init__(slave_id)
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self._client: Optional[AsyncModbusSerialClient] = None
async def connect(self) -> bool:
self._client = AsyncModbusSerialClient(
port=self.port,
baudrate=self.baudrate,
framer=FramerType.RTU,
timeout=self.timeout,
)
return await self._client.connect()
async def close(self) -> None:
if self._client:
self._client.close()
self._client = None
async def read_registers(self, address: int, count: int) -> ModbusResult:
if not self._client:
return ModbusResult(success=False, error="Not connected")
start = time.perf_counter()
try:
result = await self._client.read_holding_registers(
address=address,
count=count,
device_id=self.slave_id
)
latency = (time.perf_counter() - start) * 1000
if result.isError():
return ModbusResult(success=False, latency_ms=latency, error=str(result))
return ModbusResult(success=True, data=list(result.registers), latency_ms=latency)
except Exception as e:
latency = (time.perf_counter() - start) * 1000
return ModbusResult(success=False, latency_ms=latency, error=str(e))
async def write_register(self, address: int, value: int) -> ModbusResult:
if not self._client:
return ModbusResult(success=False, error="Not connected")
start = time.perf_counter()
try:
result = await self._client.write_register(
address=address,
value=value,
device_id=self.slave_id
)
latency = (time.perf_counter() - start) * 1000
if result.isError():
return ModbusResult(success=False, latency_ms=latency, error=str(result))
return ModbusResult(success=True, latency_ms=latency)
except Exception as e:
latency = (time.perf_counter() - start) * 1000
return ModbusResult(success=False, latency_ms=latency, error=str(e))
async def write_registers(self, address: int, values: List[int]) -> ModbusResult:
if not self._client:
return ModbusResult(success=False, error="Not connected")
start = time.perf_counter()
try:
result = await self._client.write_registers(
address=address,
values=values,
device_id=self.slave_id
)
latency = (time.perf_counter() - start) * 1000
if result.isError():
return ModbusResult(success=False, latency_ms=latency, error=str(result))
return ModbusResult(success=True, latency_ms=latency)
except Exception as e:
latency = (time.perf_counter() - start) * 1000
return ModbusResult(success=False, latency_ms=latency, error=str(e))
@property
def is_connected(self) -> bool:
return self._client is not None and self._client.connected
@property
def name(self) -> str:
return f"UART ({self.port} @ {self.baudrate})"
class AsyncTcpClient(AsyncModbusClient):
"""Async Modbus RTU over TCP client."""
def __init__(
self,
host: str,
port: int = 502,
slave_id: int = MODBUS_SLAVE_ID,
timeout: float = 2.0
):
super().__init__(slave_id)
self.host = host
self.port = port
self.timeout = timeout
self._client: Optional[AsyncModbusTcpClient] = None
async def connect(self) -> bool:
self._client = AsyncModbusTcpClient(
host=self.host,
port=self.port,
framer=FramerType.RTU,
timeout=self.timeout,
)
return await self._client.connect()
async def close(self) -> None:
if self._client:
self._client.close()
self._client = None
async def read_registers(self, address: int, count: int) -> ModbusResult:
if not self._client:
return ModbusResult(success=False, error="Not connected")
start = time.perf_counter()
try:
result = await self._client.read_holding_registers(
address=address,
count=count,
device_id=self.slave_id
)
latency = (time.perf_counter() - start) * 1000
if result.isError():
return ModbusResult(success=False, latency_ms=latency, error=str(result))
return ModbusResult(success=True, data=list(result.registers), latency_ms=latency)
except Exception as e:
latency = (time.perf_counter() - start) * 1000
return ModbusResult(success=False, latency_ms=latency, error=str(e))
async def write_register(self, address: int, value: int) -> ModbusResult:
if not self._client:
return ModbusResult(success=False, error="Not connected")
start = time.perf_counter()
try:
result = await self._client.write_register(
address=address,
value=value,
device_id=self.slave_id
)
latency = (time.perf_counter() - start) * 1000
if result.isError():
return ModbusResult(success=False, latency_ms=latency, error=str(result))
return ModbusResult(success=True, latency_ms=latency)
except Exception as e:
latency = (time.perf_counter() - start) * 1000
return ModbusResult(success=False, latency_ms=latency, error=str(e))
async def write_registers(self, address: int, values: List[int]) -> ModbusResult:
if not self._client:
return ModbusResult(success=False, error="Not connected")
start = time.perf_counter()
try:
result = await self._client.write_registers(
address=address,
values=values,
device_id=self.slave_id
)
latency = (time.perf_counter() - start) * 1000
if result.isError():
return ModbusResult(success=False, latency_ms=latency, error=str(result))
return ModbusResult(success=True, latency_ms=latency)
except Exception as e:
latency = (time.perf_counter() - start) * 1000
return ModbusResult(success=False, latency_ms=latency, error=str(e))
@property
def is_connected(self) -> bool:
return self._client is not None and self._client.connected
@property
def name(self) -> str:
return f"TCP ({self.host}:{self.port})"
def _modbus_crc(data: bytes) -> bytes:
"""Calculate Modbus CRC-16."""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return struct.pack('<H', crc)
class AsyncBluetoothClient(AsyncModbusClient):
"""Async Modbus RTU over Bluetooth RFCOMM.
Note: This implementation uses raw socket with manual RTU framing
since pymodbus doesn't directly support Bluetooth sockets.
"""
def __init__(
self,
mac_address: str,
channel: int = 1,
slave_id: int = MODBUS_SLAVE_ID,
timeout: float = 1.0
):
super().__init__(slave_id)
self.mac_address = mac_address
self.channel = channel
self.timeout = timeout
self._sock: Optional[socket.socket] = None
self._connected = False
async def connect(self) -> bool:
loop = asyncio.get_event_loop()
try:
self._sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, BTPROTO_RFCOMM)
self._sock.setblocking(False)
await loop.sock_connect(self._sock, (self.mac_address, self.channel))
self._connected = True
return True
except Exception:
if self._sock:
self._sock.close()
self._sock = None
return False
async def close(self) -> None:
if self._sock:
self._sock.close()
self._sock = None
self._connected = False
async def _send_receive(self, request: bytes, expected_len: int) -> Tuple[Optional[bytes], float]:
"""Send request and receive response."""
loop = asyncio.get_event_loop()
start = time.perf_counter()
try:
await loop.sock_sendall(self._sock, request)
response = b''
while len(response) < expected_len:
try:
chunk = await asyncio.wait_for(
loop.sock_recv(self._sock, expected_len - len(response)),
timeout=self.timeout
)
if not chunk:
break
response += chunk
except asyncio.TimeoutError:
break
latency = (time.perf_counter() - start) * 1000
return response, latency
except Exception:
latency = (time.perf_counter() - start) * 1000
return None, latency
async def read_registers(self, address: int, count: int) -> ModbusResult:
if not self._sock:
return ModbusResult(success=False, error="Not connected")
# Build RTU frame: [slave_id, func(0x03), addr_hi, addr_lo, count_hi, count_lo, crc]
frame = struct.pack('>BBHH', self.slave_id, 0x03, address, count)
request = frame + _modbus_crc(frame)
expected_len = 3 + count * 2 + 2 # slave + func + byte_count + data + crc
response, latency = await self._send_receive(request, expected_len)
if response is None or len(response) < 5:
return ModbusResult(success=False, latency_ms=latency, error="No response")
# Verify CRC
received_crc = response[-2:]
calculated_crc = _modbus_crc(response[:-2])
if received_crc != calculated_crc:
return ModbusResult(success=False, latency_ms=latency, error="CRC error")
# Parse response
byte_count = response[2]
data = response[3:3 + byte_count]
values = []
for i in range(0, len(data), 2):
if i + 1 < len(data):
values.append(struct.unpack('>H', data[i:i + 2])[0])
return ModbusResult(success=True, data=values, latency_ms=latency)
async def write_register(self, address: int, value: int) -> ModbusResult:
if not self._sock:
return ModbusResult(success=False, error="Not connected")
# Build RTU frame: [slave_id, func(0x06), addr_hi, addr_lo, value_hi, value_lo, crc]
frame = struct.pack('>BBHH', self.slave_id, 0x06, address, value)
request = frame + _modbus_crc(frame)
response, latency = await self._send_receive(request, 8)
if response is None or len(response) < 6:
return ModbusResult(success=False, latency_ms=latency, error="No response")
return ModbusResult(success=True, latency_ms=latency)
async def write_registers(self, address: int, values: List[int]) -> ModbusResult:
if not self._sock:
return ModbusResult(success=False, error="Not connected")
# Build RTU frame: [slave_id, func(0x10), addr, count, byte_count, data..., crc]
count = len(values)
byte_count = count * 2
frame = struct.pack('>BBHHB', self.slave_id, 0x10, address, count, byte_count)
for v in values:
frame += struct.pack('>H', v)
request = frame + _modbus_crc(frame)
response, latency = await self._send_receive(request, 8)
if response is None or len(response) < 6:
return ModbusResult(success=False, latency_ms=latency, error="No response")
return ModbusResult(success=True, latency_ms=latency)
@property
def is_connected(self) -> bool:
return self._connected and self._sock is not None
@property
def name(self) -> str:
return f"Bluetooth ({self.mac_address})"
def create_client(
transport: str,
address: str,
slave_id: int = MODBUS_SLAVE_ID,
**kwargs
) -> AsyncModbusClient:
"""Factory function to create the appropriate client.
Args:
transport: One of 'uart', 'tcp', 'bluetooth' (or 'bt')
address: Port path (uart), host:port (tcp), or MAC address (bluetooth)
slave_id: Modbus slave ID
**kwargs: Additional arguments passed to client constructor
Returns:
AsyncModbusClient instance
"""
transport = transport.lower()
if transport == 'uart' or transport == 'serial':
baudrate = kwargs.get('baudrate', 115200)
return AsyncSerialClient(address, baudrate=baudrate, slave_id=slave_id)
elif transport == 'tcp':
if ':' in address:
host, port = address.rsplit(':', 1)
port = int(port)
else:
host = address
port = kwargs.get('port', 502)
return AsyncTcpClient(host, port=port, slave_id=slave_id)
elif transport in ('bluetooth', 'bt'):
channel = kwargs.get('channel', 1)
return AsyncBluetoothClient(address, channel=channel, slave_id=slave_id)
else:
raise ValueError(f"Unknown transport: {transport}")
\ No newline at end of file
#!/usr/bin/env python3
"""
Modbus RTU over Bluetooth SPP - Direct connection using Python's built-in socket.
No external Bluetooth library required (uses Linux BlueZ stack directly).
Usage: python modbus_bluetooth_direct.py <MAC_ADDRESS> [slave_id] [interval_ms] [channel]
Example: python modbus_bluetooth_direct.py AA:BB:CC:DD:EE:FF 17 100 1
Find your ESP32 MAC with: bluetoothctl devices
"""
import socket
import struct
import sys
import time
from collections import deque
# Bluetooth constants (from Linux kernel headers)
BTPROTO_RFCOMM = 3
def modbus_crc(data: bytes) -> bytes:
"""Calculate Modbus CRC-16."""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return struct.pack('<H', crc)
class ModbusBluetoothClient:
def __init__(self, mac_address: str, channel: int = 1, timeout: float = 0.3):
self.mac_address = mac_address
self.channel = channel
self.timeout = timeout
self.sock = None
def connect(self):
# Create Bluetooth RFCOMM socket
self.sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, BTPROTO_RFCOMM)
self.sock.settimeout(self.timeout)
print(f"Connecting to {self.mac_address} on channel {self.channel}...")
self.sock.connect((self.mac_address, self.channel))
print("Connected!")
def close(self):
if self.sock:
self.sock.close()
self.sock = None
def read_registers(self, slave_id: int, address: int, count: int) -> tuple:
"""Read holding registers. Returns (values, latency_ms) or (None, latency_ms) on error."""
frame = struct.pack('>BBHH', slave_id, 0x03, address, count)
frame_with_crc = frame + modbus_crc(frame)
start = time.perf_counter()
try:
self.sock.send(frame_with_crc)
# Expected response: slave_id(1) + func(1) + byte_count(1) + data(count*2) + crc(2)
expected_len = 3 + count * 2 + 2
response = b''
while len(response) < expected_len:
try:
chunk = self.sock.recv(expected_len - len(response))
if not chunk:
break
response += chunk
except socket.timeout:
break
latency = (time.perf_counter() - start) * 1000
if len(response) >= 5:
byte_count = response[2]
data = response[3:3+byte_count]
values = []
for i in range(0, len(data), 2):
if i + 1 < len(data):
values.append(struct.unpack('>H', data[i:i+2])[0])
return values, latency
return None, latency
except Exception as e:
latency = (time.perf_counter() - start) * 1000
return None, latency
def main():
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <MAC_ADDRESS> [slave_id] [interval_ms] [channel]")
print()
print("Find your ESP32 MAC address with:")
print(" bluetoothctl devices")
print()
print(f"Example: {sys.argv[0]} AA:BB:CC:DD:EE:FF 17 100 1")
print()
print("Before running, make sure the device is paired and trusted:")
print(" bluetoothctl pair AA:BB:CC:DD:EE:FF")
print(" bluetoothctl trust AA:BB:CC:DD:EE:FF")
sys.exit(1)
mac_address = sys.argv[1]
slave_id = int(sys.argv[2]) if len(sys.argv) > 2 else 17
interval_ms = int(sys.argv[3]) if len(sys.argv) > 3 else 20
channel = int(sys.argv[4]) if len(sys.argv) > 4 else 1
register_count = 100
interval_s = interval_ms / 1000.0
# Bytes per transaction: request(8) + response(3 + count*2 + 2)
bytes_per_request = 8 + 3 + register_count * 2 + 2
print(f"Modbus Bluetooth Direct Test")
print(f"MAC: {mac_address}, Channel: {channel}, Slave ID: {slave_id}")
print(f"Reading {register_count} registers ({bytes_per_request} bytes/transaction)")
print(f"Interval: {interval_ms}ms ({1000/interval_ms:.1f} Hz)")
print("=" * 70)
print("Press Ctrl+C to stop and see statistics\n")
client = ModbusBluetoothClient(mac_address, channel=channel)
try:
client.connect()
except Exception as e:
print(f"Connection failed: {e}")
print("\nTroubleshooting:")
print(f" 1. Make sure the device is paired: bluetoothctl pair {mac_address}")
print(f" 2. Make sure the device is trusted: bluetoothctl trust {mac_address}")
print(" 3. Try a different channel (1-30), ESP32 SPP usually uses channel 1")
print(" 4. Check if Bluetooth service is running: systemctl status bluetooth")
sys.exit(1)
latencies = deque(maxlen=1000)
success_count = 0
error_count = 0
total_bytes = 0
start_time = time.time()
try:
while True:
loop_start = time.perf_counter()
values, latency = client.read_registers(slave_id, 0, register_count)
if values is not None:
success_count += 1
total_bytes += bytes_per_request
latencies.append(latency)
elapsed = time.time() - start_time
rate = success_count / elapsed if elapsed > 0 else 0
mbits = (total_bytes * 8) / (elapsed * 1_000) if elapsed > 0 else 0
# Only show first 2 register values
# reg0 = values[0] if len(values) > 0 else 0
# reg1 = values[1] if len(values) > 1 else 0
regs = str(values[:10])
print(f"\r[{elapsed:6.1f}s] {regs} | "
f"{latency:5.1f}ms | "
f"{rate:5.1f} msg/s | "
f"{mbits:5.3f} Kbit/s | "
f"OK:{success_count} Err:{error_count}",
end="", flush=True)
else:
error_count += 1
print(f"\r[Error] Timeout or invalid response (latency: {latency:.1f}ms)",
end="", flush=True)
elapsed_loop = time.perf_counter() - loop_start
sleep_time = interval_s - elapsed_loop
if sleep_time > 0:
time.sleep(sleep_time)
except KeyboardInterrupt:
print("\n\n" + "=" * 70)
print("STATISTICS")
print("=" * 70)
total_time = time.time() - start_time
total_requests = success_count + error_count
print(f"Duration: {total_time:.1f} seconds")
print(f"Total requests: {total_requests}")
if total_requests > 0:
print(f"Successful: {success_count} ({100*success_count/total_requests:.1f}%)")
print(f"Errors: {error_count} ({100*error_count/total_requests:.1f}%)")
if total_time > 0:
print(f"Actual rate: {total_requests/total_time:.1f} req/s")
print(f"Total bytes: {total_bytes:,} bytes")
avg_mbits = (total_bytes * 8) / (total_time * 1_000_000)
print(f"Avg bandwidth: {avg_mbits:.3f} Mbit/s")
if latencies:
lat_list = list(latencies)
lat_list.sort()
print(f"\nLatency (last {len(lat_list)} samples):")
print(f" Min: {min(lat_list):.2f} ms")
print(f" Max: {max(lat_list):.2f} ms")
print(f" Avg: {sum(lat_list)/len(lat_list):.2f} ms")
print(f" Median: {lat_list[len(lat_list)//2]:.2f} ms")
if len(lat_list) >= 20:
print(f" P95: {lat_list[int(len(lat_list)*0.95)]:.2f} ms")
if len(lat_list) >= 100:
print(f" P99: {lat_list[int(len(lat_list)*0.99)]:.2f} ms")
finally:
client.close()
print("\nConnection closed.")
if __name__ == '__main__':
main()
This diff is collapsed.
#!/usr/bin/env python3
"""
Modbus RTU over Serial test - continuously reads registers.
Works with UART, Bluetooth SPP, USB-to-serial adapters, etc.
Usage: python modbus_serial_test.py <port> [slave_id] [interval_ms] [baudrate]
Examples:
UART: python modbus_serial_test.py /dev/ttyUSB0 17 100 115200
Bluetooth: python modbus_serial_test.py /dev/rfcomm0 17 100
Windows: python modbus_serial_test.py COM3 17 100
For Bluetooth, pair the device first:
Linux: sudo rfcomm bind 0 <MAC_ADDRESS>
Windows: Use the COM port assigned after pairing
"""
import serial
import struct
import sys
import time
from collections import deque
def modbus_crc(data: bytes) -> bytes:
"""Calculate Modbus CRC-16."""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return struct.pack('<H', crc)
class ModbusSerialClient:
def __init__(self, port: str, baudrate: int = 115200, timeout: float = 1.0):
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.serial = None
def connect(self):
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=self.timeout
)
# Clear any pending data
self.serial.reset_input_buffer()
self.serial.reset_output_buffer()
def close(self):
if self.serial:
self.serial.close()
self.serial = None
def read_registers(self, slave_id: int, address: int, count: int) -> tuple:
"""Read holding registers. Returns (values, latency_ms) or (None, latency_ms) on error."""
frame = struct.pack('>BBHH', slave_id, 0x03, address, count)
frame_with_crc = frame + modbus_crc(frame)
# Clear input buffer before sending
self.serial.reset_input_buffer()
start = time.perf_counter()
try:
self.serial.write(frame_with_crc)
self.serial.flush()
# Expected response: slave_id(1) + func(1) + byte_count(1) + data(count*2) + crc(2)
expected_len = 3 + count * 2 + 2
response = self.serial.read(expected_len)
latency = (time.perf_counter() - start) * 1000
if len(response) >= 5:
byte_count = response[2]
data = response[3:3+byte_count]
values = []
for i in range(0, len(data), 2):
if i + 1 < len(data):
values.append(struct.unpack('>H', data[i:i+2])[0])
return values, latency
return None, latency
except (serial.SerialException, serial.SerialTimeoutException) as e:
latency = (time.perf_counter() - start) * 1000
return None, latency
def write_register(self, slave_id: int, address: int, value: int) -> tuple:
"""Write single register. Returns (success, latency_ms)."""
frame = struct.pack('>BBHH', slave_id, 0x06, address, value)
frame_with_crc = frame + modbus_crc(frame)
self.serial.reset_input_buffer()
start = time.perf_counter()
try:
self.serial.write(frame_with_crc)
self.serial.flush()
# Response should echo the request
response = self.serial.read(8)
latency = (time.perf_counter() - start) * 1000
if len(response) >= 6:
return True, latency
return False, latency
except (serial.SerialException, serial.SerialTimeoutException):
latency = (time.perf_counter() - start) * 1000
return False, latency
def main():
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <port> [slave_id] [interval_ms] [baudrate]")
print()
print("Examples:")
print(f" UART: {sys.argv[0]} /dev/ttyUSB0 17 100 115200")
print(f" Bluetooth: {sys.argv[0]} /dev/rfcomm0 17 100")
print(f" Windows: {sys.argv[0]} COM3 17 100")
print()
print("For Bluetooth, pair the device first:")
print(" Linux: sudo rfcomm bind 0 <MAC_ADDRESS>")
print(" Windows: Use the COM port from Device Manager")
sys.exit(1)
port = sys.argv[1]
slave_id = int(sys.argv[2]) if len(sys.argv) > 2 else 17
interval_ms = int(sys.argv[3]) if len(sys.argv) > 3 else 100
baudrate = int(sys.argv[4]) if len(sys.argv) > 4 else 115200
interval_s = interval_ms / 1000.0
print(f"Modbus Serial Test")
print(f"Port: {port}, Baudrate: {baudrate}, Slave ID: {slave_id}")
print(f"Interval: {interval_ms}ms ({1000/interval_ms:.1f} Hz)")
print("=" * 70)
print("Press Ctrl+C to stop and see statistics\n")
client = ModbusSerialClient(port, baudrate=baudrate)
try:
client.connect()
print(f"Connected to {port}\n")
except Exception as e:
print(f"Connection failed: {e}")
print("\nMake sure the device is connected and the port is correct.")
sys.exit(1)
latencies = deque(maxlen=1000) # Keep last 1000 samples
success_count = 0
error_count = 0
start_time = time.time()
try:
while True:
loop_start = time.perf_counter()
values, latency = client.read_registers(slave_id, 0, 4)
if values is not None:
success_count += 1
latencies.append(latency)
# Display current reading
elapsed = time.time() - start_time
rate = success_count / elapsed if elapsed > 0 else 0
vals_str = " ".join(f"{v:5d}" for v in values)
print(f"\r[{elapsed:7.1f}s] Regs: {vals_str} | "
f"Latency: {latency:5.1f}ms | "
f"Rate: {rate:5.1f}/s | "
f"OK: {success_count} Err: {error_count}",
end="", flush=True)
else:
error_count += 1
print(f"\r[Error] Timeout or invalid response (latency: {latency:.1f}ms)",
end="", flush=True)
# Sleep for remaining interval time
elapsed_loop = time.perf_counter() - loop_start
sleep_time = interval_s - elapsed_loop
if sleep_time > 0:
time.sleep(sleep_time)
except KeyboardInterrupt:
print("\n\n" + "=" * 70)
print("STATISTICS")
print("=" * 70)
total_time = time.time() - start_time
total_requests = success_count + error_count
print(f"Duration: {total_time:.1f} seconds")
print(f"Total requests: {total_requests}")
if total_requests > 0:
print(f"Successful: {success_count} ({100*success_count/total_requests:.1f}%)")
print(f"Errors: {error_count} ({100*error_count/total_requests:.1f}%)")
if total_time > 0:
print(f"Actual rate: {total_requests/total_time:.1f} req/s")
if latencies:
lat_list = list(latencies)
lat_list.sort()
print(f"\nLatency (last {len(lat_list)} samples):")
print(f" Min: {min(lat_list):.2f} ms")
print(f" Max: {max(lat_list):.2f} ms")
print(f" Avg: {sum(lat_list)/len(lat_list):.2f} ms")
print(f" Median: {lat_list[len(lat_list)//2]:.2f} ms")
if len(lat_list) >= 20:
print(f" P95: {lat_list[int(len(lat_list)*0.95)]:.2f} ms")
if len(lat_list) >= 100:
print(f" P99: {lat_list[int(len(lat_list)*0.99)]:.2f} ms")
finally:
client.close()
print("\nConnection closed.")
if __name__ == '__main__':
main()
#!/usr/bin/env python3
"""
Modbus RTU over TCP test client.
Usage: python modbus_test.py <ip> [port]
"""
import socket
import struct
import sys
def modbus_crc(data: bytes) -> bytes:
"""Calculate Modbus CRC-16."""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return struct.pack('<H', crc) # Little-endian (low byte first)
def send_modbus(ip: str, port: int, frame: bytes, timeout: float = 2.0) -> bytes:
"""Send a Modbus frame and receive response."""
frame_with_crc = frame + modbus_crc(frame)
print(f"TX: {frame_with_crc.hex(' ').upper()}")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
s.connect((ip, port))
s.send(frame_with_crc)
try:
response = s.recv(256)
print(f"RX: {response.hex(' ').upper()}")
return response
except socket.timeout:
print("RX: <timeout>")
return b''
def read_holding_registers(ip: str, port: int, slave_id: int, address: int, count: int) -> bytes:
"""Read holding registers (FC 0x03)."""
frame = struct.pack('>BBHH', slave_id, 0x03, address, count)
return send_modbus(ip, port, frame)
def read_input_registers(ip: str, port: int, slave_id: int, address: int, count: int) -> bytes:
"""Read input registers (FC 0x04)."""
frame = struct.pack('>BBHH', slave_id, 0x04, address, count)
return send_modbus(ip, port, frame)
def write_single_register(ip: str, port: int, slave_id: int, address: int, value: int) -> bytes:
"""Write single register (FC 0x06)."""
frame = struct.pack('>BBHH', slave_id, 0x06, address, value)
return send_modbus(ip, port, frame)
def write_multiple_registers(ip: str, port: int, slave_id: int, address: int, values: list) -> bytes:
"""Write multiple registers (FC 0x10)."""
count = len(values)
byte_count = count * 2
frame = struct.pack('>BBHHB', slave_id, 0x10, address, count, byte_count)
for v in values:
frame += struct.pack('>H', v)
return send_modbus(ip, port, frame)
def read_coils(ip: str, port: int, slave_id: int, address: int, count: int) -> bytes:
"""Read coils (FC 0x01)."""
frame = struct.pack('>BBHH', slave_id, 0x01, address, count)
return send_modbus(ip, port, frame)
def write_single_coil(ip: str, port: int, slave_id: int, address: int, value: bool) -> bytes:
"""Write single coil (FC 0x05)."""
coil_value = 0xFF00 if value else 0x0000
frame = struct.pack('>BBHH', slave_id, 0x05, address, coil_value)
return send_modbus(ip, port, frame)
def parse_register_response(response: bytes) -> list:
"""Parse a read registers response and return list of register values."""
if len(response) < 5:
return []
# Skip slave_id, function, byte_count
byte_count = response[2]
data = response[3:3+byte_count]
registers = []
for i in range(0, len(data), 2):
if i + 1 < len(data):
registers.append(struct.unpack('>H', data[i:i+2])[0])
return registers
def main():
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <ip> [port] [slave_id]")
print(f"Example: {sys.argv[0]} 192.168.1.100 502 17")
sys.exit(1)
ip = sys.argv[1]
port = int(sys.argv[2]) if len(sys.argv) > 2 else 502
slave_id = int(sys.argv[3]) if len(sys.argv) > 3 else 17
print(f"Testing Modbus RTU over TCP at {ip}:{port}, slave ID {slave_id}")
print("=" * 60)
# Test 1: Read 4 holding registers at address 0
print("\n[Test 1] Read 4 holding registers at address 0 (FC 0x03)")
response = read_holding_registers(ip, port, slave_id, 0, 4)
if response:
values = parse_register_response(response)
print(f"Values: {values}")
print(f"As hex: {[hex(v) for v in values]}")
# Test 2: Write single register
print("\n[Test 2] Write value 0x1234 to register 0 (FC 0x06)")
response = write_single_register(ip, port, slave_id, 0, 0x1234)
# Test 3: Read back to verify
print("\n[Test 3] Read back register 0 to verify write")
response = read_holding_registers(ip, port, slave_id, 0, 1)
if response:
values = parse_register_response(response)
print(f"Value: {values[0] if values else 'N/A'} (expected: 0x1234 = 4660)")
# Test 4: Read coils
print("\n[Test 4] Read 16 coils at address 0 (FC 0x01)")
response = read_coils(ip, port, slave_id, 0, 16)
print("\n" + "=" * 60)
print("Tests complete!")
if __name__ == '__main__':
main()
\ No newline at end of file
......@@ -5,8 +5,12 @@ description = "Test tools for ESP32 Modbus library"
requires-python = ">=3.10"
dependencies = [
"matplotlib>=3.10.8",
"pydantic-settings>=2.0.0",
"pymodbus>=3.6.0",
"pyserial>=3.5",
"pyyaml>=6.0",
"requests>=2.32.5",
"textual>=3.0.0",
"websocket-client>=1.9.0",
]
......
This diff is collapsed.
#!/usr/bin/env python3
"""
Servo Motion Profile Test (UART) - Plots S-curve/trapezoidal motion profiles
Connects to ESP32 via UART Modbus, commands a position move,
and records position/velocity/acceleration at high frequency to visualize the motion profile.
Usage: python servo_profile_test_uart.py <PORT> [distance] [options]
Example: python servo_profile_test_uart.py /dev/ttyUSB0 10000
python servo_profile_test_uart.py COM3 5000 --jerk 100000
"""
import struct
import time
import argparse
from dataclasses import dataclass
from typing import Optional, List, Tuple
import serial
# Modbus register offsets (calculated from ControllerData struct layout)
REG_EXECUTION_INTERVAL = 0
REG_EXECUTION_CYCLES = 6
REG_CPU_FREQ_HZ = 8
# FastData starts at offset 10
REG_FAST_SERVO_CURRENT = 10
REG_FAST_SERVO_DESIRED = 12
REG_FAST_SERVO_MODE = 14
REG_FAST_SCALE_CURRENT = 16
REG_FAST_CYCLES = 24
# ServoAxis[0] starts at offset 28
REG_SERVO0_MAX_SPEED = 28
REG_SERVO0_CURRENT_SPEED = 30
REG_SERVO0_JOG_SPEED = 32
REG_SERVO0_REF_SPEED_FWD = 34
REG_SERVO0_REF_SPEED_REV = 36
REG_SERVO0_MAX_ACCEL = 38
REG_SERVO0_CURRENT_ACCEL = 40
REG_SERVO0_JERK = 42
REG_SERVO0_STEPS_TO_GO = 44
REG_SERVO0_PREVIOUS_STEPS = 46
REG_SERVO0_CURRENT_STEPS = 48
REG_SERVO0_DESIRED_STEPS = 50
REG_SERVO0_MODE = 62
# GlobalMode flags
GLOBAL_MODE_ENABLE = 0x0001
GLOBAL_MODE_MOTION_HOLD = 0x0002
# ServoMode flags
SERVO_MODE_JOG = 0x0002
SERVO_MODE_POSITION = 0x0004
SERVO_MODE_SYNC = 0x0008
SERVO_MODE_REF_REQUEST = 0x0010
SERVO_MODE_REF_FORWARD = 0x0020
SERVO_MODE_REF_REVERSE = 0x0040
SERVO_MODE_REF_DONE = 0x0080
SERVO_MODE_REF_ERROR = 0x0100
def float_to_regs(value: float) -> Tuple[int, int]:
"""Convert a float to two 16-bit registers (little-endian, word swap)."""
packed = struct.pack('<f', value)
r0 = struct.unpack('<H', packed[0:2])[0]
r1 = struct.unpack('<H', packed[2:4])[0]
return r0, r1
def int32_to_regs(value: int) -> Tuple[int, int]:
"""Convert a signed int32 to two 16-bit registers (little-endian, word swap)."""
packed = struct.pack('<i', value)
r0 = struct.unpack('<H', packed[0:2])[0]
r1 = struct.unpack('<H', packed[2:4])[0]
return r0, r1
def regs_to_float(regs: List[int]) -> float:
"""Convert two 16-bit registers to a float (little-endian, word swap)."""
packed = struct.pack('<HH', regs[0], regs[1])
return struct.unpack('<f', packed)[0]
def regs_to_int32(regs: List[int]) -> int:
"""Convert two 16-bit registers to a signed int32 (little-endian, word swap)."""
packed = struct.pack('<HH', regs[0], regs[1])
return struct.unpack('<i', packed)[0]
def regs_to_uint32(regs: List[int]) -> int:
"""Convert two 16-bit registers to an unsigned uint32 (little-endian, word swap)."""
packed = struct.pack('<HH', regs[0], regs[1])
return struct.unpack('<I', packed)[0]
def modbus_crc(data: bytes) -> bytes:
"""Calculate Modbus CRC-16."""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return struct.pack('<H', crc)
class ModbusUartClient:
def __init__(self, port: str, baudrate: int = 115200, timeout: float = 0.5):
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.serial = None
self.slave_id = 17
def connect(self):
print(f"Opening {self.port} at {self.baudrate} baud...")
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=self.timeout
)
# Clear any pending data
self.serial.reset_input_buffer()
self.serial.reset_output_buffer()
print("Connected!")
def close(self):
if self.serial:
self.serial.close()
self.serial = None
def read_registers(self, address: int, count: int) -> Optional[List[int]]:
"""Read holding registers."""
frame = struct.pack('>BBHH', self.slave_id, 0x03, address, count)
frame_with_crc = frame + modbus_crc(frame)
try:
self.serial.reset_input_buffer()
self.serial.write(frame_with_crc)
self.serial.flush()
expected_len = 3 + count * 2 + 2
response = self.serial.read(expected_len)
if len(response) >= 5:
# Verify CRC
received_crc = response[-2:]
calculated_crc = modbus_crc(response[:-2])
if received_crc != calculated_crc:
print(f"CRC error: expected {calculated_crc.hex()}, got {received_crc.hex()}")
return None
byte_count = response[2]
data = response[3:3+byte_count]
values = []
for i in range(0, len(data), 2):
if i + 1 < len(data):
values.append(struct.unpack('>H', data[i:i+2])[0])
return values
return None
except Exception as e:
print(f"Read error: {e}")
return None
def write_register(self, address: int, value: int) -> bool:
"""Write single holding register."""
frame = struct.pack('>BBHH', self.slave_id, 0x06, address, value)
frame_with_crc = frame + modbus_crc(frame)
try:
self.serial.reset_input_buffer()
self.serial.write(frame_with_crc)
self.serial.flush()
response = self.serial.read(8)
return len(response) == 8
except Exception as e:
print(f"Write error: {e}")
return False
def write_registers(self, address: int, values: List[int]) -> bool:
"""Write multiple holding registers."""
count = len(values)
byte_count = count * 2
frame = struct.pack('>BBHHB', self.slave_id, 0x10, address, count, byte_count)
for v in values:
frame += struct.pack('>H', v)
frame_with_crc = frame + modbus_crc(frame)
try:
self.serial.reset_input_buffer()
self.serial.write(frame_with_crc)
self.serial.flush()
response = self.serial.read(8)
return len(response) == 8
except Exception as e:
print(f"Write error: {e}")
return False
@dataclass
class MotionSample:
time_ms: float
position: int
speed: float
exec_interval: int # Execution interval in CPU cycles
steps_to_go: int
def read_and_display_mode_flags(client: ModbusUartClient):
"""Read and display current mode flags for debugging."""
regs = client.read_registers(REG_FAST_SERVO_MODE, 2)
if regs:
global_mode = regs[0] | (regs[1] << 16)
print(f" GlobalMode: 0x{global_mode:04X}")
print(f" - enable: {bool(global_mode & GLOBAL_MODE_ENABLE)}")
print(f" - motionHold: {bool(global_mode & GLOBAL_MODE_MOTION_HOLD)}")
regs = client.read_registers(REG_SERVO0_MODE, 2)
if regs:
servo_mode = regs[0] | (regs[1] << 16)
print(f" ServoMode: 0x{servo_mode:04X}")
print(f" - jog: {bool(servo_mode & SERVO_MODE_JOG)}")
print(f" - position: {bool(servo_mode & SERVO_MODE_POSITION)}")
print(f" - sync: {bool(servo_mode & SERVO_MODE_SYNC)}")
print(f" - refRequest: {bool(servo_mode & SERVO_MODE_REF_REQUEST)}")
print(f" - refDone: {bool(servo_mode & SERVO_MODE_REF_DONE)}")
def run_motion_test(client: ModbusUartClient, distance: int,
max_speed: float, max_accel: float, jerk: float,
poll_interval_ms: float = 10) -> List[MotionSample]:
"""Execute a motion profile and record samples."""
samples = []
print("\n--- Reading Current Mode Flags ---")
read_and_display_mode_flags(client)
print("\n--- Configuring Motion Parameters ---")
print(f" Max Speed: {max_speed} steps/s")
r0, r1 = float_to_regs(max_speed)
client.write_registers(REG_SERVO0_MAX_SPEED, [r0, r1])
print(f" Max Acceleration: {max_accel} steps/s^2")
r0, r1 = float_to_regs(max_accel)
client.write_registers(REG_SERVO0_MAX_ACCEL, [r0, r1])
print(f" Jerk: {jerk} steps/s^3")
r0, r1 = float_to_regs(jerk)
client.write_registers(REG_SERVO0_JERK, [r0, r1])
print(" Resetting speed and acceleration...")
r0, r1 = float_to_regs(0.0)
client.write_registers(REG_SERVO0_CURRENT_SPEED, [r0, r1])
client.write_registers(REG_SERVO0_CURRENT_ACCEL, [r0, r1])
r0, r1 = int32_to_regs(0)
client.write_registers(REG_SERVO0_STEPS_TO_GO, [r0, r1])
print("\n--- Setting Mode Flags ---")
print(" Clearing ServoMode flags...")
client.write_registers(REG_SERVO0_MODE, [0x0000, 0x0000])
time.sleep(0.05)
print(f" Setting position mode (SERVO_MODE_POSITION = 0x{SERVO_MODE_POSITION:04X})...")
client.write_registers(REG_SERVO0_MODE, [SERVO_MODE_POSITION, 0x0000])
time.sleep(0.05)
print(f" Enabling servo (GLOBAL_MODE_ENABLE = 0x{GLOBAL_MODE_ENABLE:04X})...")
client.write_registers(REG_FAST_SERVO_MODE, [GLOBAL_MODE_ENABLE, 0x0000])
time.sleep(0.1)
print("\n--- Verifying Mode Flags ---")
read_and_display_mode_flags(client)
regs = client.read_registers(REG_SERVO0_CURRENT_STEPS, 2)
if regs:
initial_pos = regs_to_uint32(regs)
print(f"\n Initial position: {initial_pos}")
else:
initial_pos = 0
print(f"\n--- Starting Motion: {distance} steps ---")
r0, r1 = int32_to_regs(distance)
client.write_registers(REG_SERVO0_STEPS_TO_GO, [r0, r1])
start_time = time.perf_counter()
poll_interval_s = poll_interval_ms / 1000.0
motion_complete = False
last_steps_to_go = distance
stable_count = 0
while not motion_complete:
loop_start = time.perf_counter()
elapsed_ms = (loop_start - start_time) * 1000
# Read execution interval (2 registers at offset 0)
interval_regs = client.read_registers(REG_EXECUTION_INTERVAL, 2)
regs = client.read_registers(REG_SERVO0_CURRENT_SPEED, 16)
if regs and interval_regs:
current_speed = regs_to_float(regs[0:2])
steps_to_go = regs_to_int32(regs[14:16])
exec_interval = regs_to_uint32(interval_regs)
pos_regs = client.read_registers(REG_SERVO0_CURRENT_STEPS, 2)
if pos_regs:
position = regs_to_uint32(pos_regs)
else:
position = 0
sample = MotionSample(
time_ms=elapsed_ms,
position=position - initial_pos,
speed=current_speed,
exec_interval=exec_interval,
steps_to_go=steps_to_go
)
samples.append(sample)
if position == last_steps_to_go and abs(current_speed) < 1:
stable_count += 1
if stable_count >= 5:
motion_complete = True
else:
stable_count = 0
if elapsed_ms > 30000:
print("\nTimeout!")
break
progress = 100 * (1 - abs(steps_to_go) / abs(distance)) if distance != 0 else 100
# Convert exec_interval to microseconds (assuming 240MHz CPU)
exec_us = exec_interval / 240.0
print(f"\r [{elapsed_ms:7.1f}ms] pos={position-initial_pos:8d} "
f"spd={current_speed:8.1f} interval={exec_us:6.2f}µs "
f"toGo={steps_to_go:8d} ({progress:5.1f}%)", end="", flush=True)
if progress >= 100:
motion_complete = True
elapsed_loop = time.perf_counter() - loop_start
sleep_time = poll_interval_s - elapsed_loop
if sleep_time > 0:
time.sleep(sleep_time)
print(f"\n\n--- Motion Complete ---")
if samples:
print(f" Total time: {samples[-1].time_ms:.1f} ms")
print(f" Final position: {samples[-1].position}")
print(f" Samples collected: {len(samples)}")
return samples
def plot_motion_profile(samples: List[MotionSample], title: str = "Servo Motion Profile"):
"""Plot position, velocity, and execution interval profiles."""
try:
import matplotlib.pyplot as plt
except ImportError:
print("\nMatplotlib not installed. Install with: pip install matplotlib")
print("Saving data to CSV instead...")
save_to_csv(samples)
return
times = [s.time_ms for s in samples]
positions = [s.position for s in samples]
speeds = [s.speed for s in samples]
# Convert execution interval to microseconds (assuming 240MHz CPU)
exec_intervals_us = [s.exec_interval / 240.0 for s in samples]
fig, axes = plt.subplots(3, 1, figsize=(12, 10), sharex=True)
fig.suptitle(title, fontsize=14)
axes[0].plot(times, positions, 'b-', linewidth=1.5)
axes[0].set_ylabel('Position (steps)', fontsize=11)
axes[0].grid(True, alpha=0.3)
axes[0].set_title('Position vs Time')
axes[1].plot(times, speeds, 'g-', linewidth=1.5)
axes[1].set_ylabel('Velocity (steps/s)', fontsize=11)
axes[1].grid(True, alpha=0.3)
axes[1].set_title('Velocity vs Time')
axes[1].axhline(y=0, color='k', linestyle='-', linewidth=0.5)
axes[2].plot(times, exec_intervals_us, 'r-', linewidth=1.5)
axes[2].set_ylabel('Execution Interval (µs)', fontsize=11)
axes[2].set_xlabel('Time (ms)', fontsize=11)
axes[2].grid(True, alpha=0.3)
axes[2].set_title('Loop Timing (target: 10µs for 100kHz)')
axes[2].axhline(y=10.0, color='k', linestyle='--', linewidth=1, label='Target (10µs)')
axes[2].legend()
plt.tight_layout()
plt.show()
def save_to_csv(samples: List[MotionSample], filename: str = "motion_profile.csv"):
"""Save samples to CSV file."""
with open(filename, 'w') as f:
f.write("time_ms,position,speed,exec_interval_cycles,exec_interval_us,steps_to_go\n")
for s in samples:
exec_us = s.exec_interval / 240.0 # Convert to µs assuming 240MHz
f.write(f"{s.time_ms:.2f},{s.position},{s.speed:.2f},{s.exec_interval},{exec_us:.2f},{s.steps_to_go}\n")
print(f"Data saved to {filename}")
def main():
parser = argparse.ArgumentParser(description='Servo Motion Profile Test (UART)')
parser.add_argument('port', help='Serial port (e.g., /dev/ttyUSB0, COM3)', nargs='?', default="/dev/ttyUSB0")
parser.add_argument('distance', type=int, nargs='?', default=60000,
help='Distance to move in steps (default: 60000)')
parser.add_argument('--baudrate', '-b', type=int, default=115200,
help='Baud rate (default: 115200)')
parser.add_argument('--speed', '-s', type=float, default=10000,
help='Max speed in steps/s (default: 10000)')
parser.add_argument('--accel', '-a', type=float, default=2500,
help='Max acceleration in steps/s^2 (default: 2500)')
parser.add_argument('--jerk', '-j', type=float, default=50000,
help='Jerk in steps/s^3 (default: 50000, 0=trapezoidal)')
parser.add_argument('--poll', '-p', type=float, default=50,
help='Poll interval in ms (default: 50)')
parser.add_argument('--csv', type=str, default=None,
help='Save to CSV file instead of plotting')
args = parser.parse_args()
print("=" * 60)
print("Servo Motion Profile Test (UART)")
print("=" * 60)
print(f"Port: {args.port}")
print(f"Baudrate: {args.baudrate}")
print(f"Distance: {args.distance} steps")
print(f"Max Speed: {args.speed} steps/s")
print(f"Max Accel: {args.accel} steps/s^2")
print(f"Jerk: {args.jerk} steps/s^3 {'(S-curve)' if args.jerk > 0 else '(Trapezoidal)'}")
print(f"Poll Interval: {args.poll} ms")
client = ModbusUartClient(args.port, baudrate=args.baudrate)
try:
client.connect()
samples = run_motion_test(
client,
args.distance,
max_speed=args.speed,
max_accel=args.accel,
jerk=args.jerk,
poll_interval_ms=args.poll
)
if samples:
if args.csv:
save_to_csv(samples, args.csv)
else:
title = f"Motion Profile (UART): {args.distance} steps, S={args.jerk}"
plot_motion_profile(samples, title)
except KeyboardInterrupt:
print("\nAborted by user")
except Exception as e:
print(f"\nError: {e}")
import traceback
traceback.print_exc()
finally:
try:
client.write_register(REG_FAST_SERVO_MODE, 0x0000)
print("\nServo disabled")
except:
pass
client.close()
if __name__ == '__main__':
main()
\ No newline at end of file
"""
Application settings with YAML persistence.
Uses Pydantic for validation and stores settings in a YAML file.
"""
from pathlib import Path
from typing import Literal, Optional
import yaml
from pydantic import BaseModel, Field
from controller import MODBUS_SLAVE_ID
# Settings file location (in tools directory)
SETTINGS_FILE = Path(__file__).parent / "settings.yaml"
class TransportSettings(BaseModel):
"""Common transport settings."""
# Default transport type
default_transport: Literal["uart", "bt", "tcp"] = "uart"
# UART settings
serial_port: str = "/dev/ttyUSB0"
baudrate: int = 115200
# Bluetooth settings
bluetooth_mac: str = ""
bluetooth_channel: int = 1
# TCP settings
tcp_address: str = "192.168.1.100:502"
# Modbus settings
slave_id: int = MODBUS_SLAVE_ID
class LatencyTestSettings(BaseModel):
"""Settings for latency test."""
interval_ms: int = 10
register_count: int = 4
start_address: int = 0
full_mode: bool = False
dump_mode: bool = False
class ProfileTestSettings(BaseModel):
"""Settings for motion profile test."""
distance: int = 60000
max_speed: float = 10000.0
max_accel: float = 2500.0
jerk: float = 50000.0
poll_interval_ms: float = 50.0
save_csv: bool = False
csv_filename: str = "motion_profile.csv"
class HttpTestSettings(BaseModel):
"""Settings for HTTP/WebSocket test."""
host: str = "192.168.1.100"
http_port: int = 80
ws_port: int = 81
interval_ms: int = 100
timeout: float = 2.0
default_mode: Literal["polling", "websocket", "info", "dump"] = "polling"
full_mode: bool = True
class AppSettings(BaseModel):
"""Main application settings."""
transport: TransportSettings = Field(default_factory=TransportSettings)
latency: LatencyTestSettings = Field(default_factory=LatencyTestSettings)
profile: ProfileTestSettings = Field(default_factory=ProfileTestSettings)
http: HttpTestSettings = Field(default_factory=HttpTestSettings)
@classmethod
def load(cls, path: Optional[Path] = None) -> "AppSettings":
"""Load settings from YAML file."""
path = path or SETTINGS_FILE
if not path.exists():
return cls()
try:
with open(path, "r") as f:
data = yaml.safe_load(f) or {}
return cls.model_validate(data)
except Exception as e:
print(f"Warning: Could not load settings from {path}: {e}")
return cls()
def save(self, path: Optional[Path] = None) -> None:
"""Save settings to YAML file."""
path = path or SETTINGS_FILE
try:
with open(path, "w") as f:
yaml.dump(
self.model_dump(),
f,
default_flow_style=False,
sort_keys=False,
allow_unicode=True,
)
except Exception as e:
print(f"Warning: Could not save settings to {path}: {e}")
# Global settings instance
_settings: Optional[AppSettings] = None
def get_settings() -> AppSettings:
"""Get the global settings instance, loading from file if needed."""
global _settings
if _settings is None:
_settings = AppSettings.load()
return _settings
def save_settings() -> None:
"""Save the current settings to file."""
if _settings is not None:
_settings.save()
def reload_settings() -> AppSettings:
"""Reload settings from file."""
global _settings
_settings = AppSettings.load()
return _settings
\ No newline at end of file
transport:
default_transport: uart
serial_port: /dev/serial/by-id/usb-1a86_USB_Single_Serial_5AAB023274-if00
baudrate: 115200
bluetooth_mac: ''
bluetooth_channel: 1
tcp_address: 192.168.1.100:502
slave_id: 17
latency:
interval_ms: 10
register_count: 4
start_address: 0
full_mode: false
dump_mode: false
profile:
distance: 60000
max_speed: 10000.0
max_accel: 2500.0
jerk: 50000.0
poll_interval_ms: 50.0
save_csv: false
csv_filename: motion_profile.csv
http:
host: 10.1.2.168
http_port: 80
ws_port: 81
interval_ms: 50
timeout: 2.0
default_mode: polling
full_mode: true
"""
Test modules for ESP-Modbus tools.
"""
from .latency import run_latency_test, StopToken
from .profile import run_profile_test
from .http_api import run_http_test
__all__ = ['run_latency_test', 'run_profile_test', 'run_http_test', 'StopToken']
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
"""
ESP-Modbus TUI - User Interface Module.
"""
from ui.app import ESPModbusApp
__all__ = ["ESPModbusApp"]
\ No newline at end of file
"""
Main application class for the ESP-Modbus TUI.
"""
from textual.app import App
from textual.binding import Binding
from ui.menu import MainMenuScreen
class ESPModbusApp(App):
"""ESP-Modbus Test Tools Application."""
TITLE = "ESP-Modbus Test Tools"
BINDINGS = [
Binding("q", "quit", "Quit", show=True),
]
def on_mount(self) -> None:
self.push_screen(MainMenuScreen())
\ No newline at end of file
This diff is collapsed.