import uuid
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Dict, Generic, Optional, Type, TypeVar, cast
from msgcenterpy.core.envelope import FormatMetadata, MessageEnvelope, Properties
from msgcenterpy.core.field_accessor import (
FieldAccessor,
FieldAccessorFactory,
TypeInfoProvider,
)
from msgcenterpy.core.types import MessageType
if TYPE_CHECKING:
# 仅用于类型检查的导入,避免运行时循环依赖
from msgcenterpy.instances.json_schema_instance import JSONSchemaMessageInstance
from msgcenterpy.instances.ros2_instance import ROS2MessageInstance
T = TypeVar("T")
[docs]
class MessageInstance(TypeInfoProvider, ABC, Generic[T]):
"""统一消息实例基类"""
_init_ok: bool = False
# 字段访问器相关方法
@property
def fields(self) -> FieldAccessor:
if self._field_accessor is None:
raise RuntimeError("FieldAccessor not initialized")
return self._field_accessor
def __setattr__(self, field_name: str, value: Any) -> None:
if not self._init_ok:
return super().__setattr__(field_name, value)
for cls in self.__class__.__mro__:
if field_name in cls.__dict__:
return super().__setattr__(field_name, value)
self.fields[field_name] = value
return None
def __getattr__(self, field_name: str) -> Any:
if not self._init_ok:
return super().__getattribute__(field_name)
for cls in self.__class__.__mro__:
if field_name in cls.__dict__:
return super().__getattribute__(field_name)
return self.fields[field_name]
[docs]
def __getitem__(self, field_name: str) -> Any:
"""支持通过下标访问字段"""
return self.fields[field_name]
[docs]
def __setitem__(self, field_name: str, value: Any) -> None:
"""支持通过下标设置字段"""
self.fields[field_name] = value
[docs]
def __contains__(self, field_name: str) -> bool:
"""支持in操作符检查字段是否存在"""
return field_name in self.fields
[docs]
def __init__(
self,
inner_data: T,
message_type: MessageType,
metadata: Optional[FormatMetadata] = None,
):
# 初始化标记和基础属性
self._field_accessor: Optional[FieldAccessor] = None
self._instance_id: str = str(uuid.uuid4())
self.inner_data: T = inner_data # 原始类型数据
self.message_type: MessageType = message_type
self._metadata: FormatMetadata = metadata or FormatMetadata()
self._created_at = datetime.now(timezone.utc)
self._collect_public_properties_to_metadata()
self._field_accessor = FieldAccessorFactory.create_accessor(self.inner_data, self)
self._init_ok = True
def _collect_public_properties_to_metadata(self) -> None:
"""将所有非下划线开头的 @property 的当前值放入 metadata.properties 中。
仅收集只读属性,忽略访问抛出异常的属性。
"""
properties_bucket = self._metadata.setdefault("properties", Properties())
for cls in self.__class__.__mro__:
for attribute_name, attribute_value in cls.__dict__.items():
if attribute_name.startswith("_"):
continue
if isinstance(attribute_value, property):
try:
# 避免重复收集已存在的属性
if attribute_name not in properties_bucket:
properties_bucket[attribute_name] = getattr(self, attribute_name) # type: ignore[literal-required]
except (AttributeError, TypeError, RuntimeError):
# Skip attributes that can't be accessed or have incompatible types
# This includes attributes that require initialization to complete (like 'fields')
pass
[docs]
def to(self, target_type: MessageType, **kwargs: Any) -> "MessageInstance[Any]":
"""直接转换到目标类型"""
if target_type == MessageType.ROS2:
return cast("MessageInstance[Any]", self.to_ros2(**kwargs))
elif target_type == MessageType.DICT:
return cast("MessageInstance[Any]", self.to_dict(**kwargs))
elif target_type == MessageType.JSON:
return cast("MessageInstance[Any]", self.to_json(**kwargs))
elif target_type == MessageType.JSON_SCHEMA:
return cast("MessageInstance[Any]", self.to_json_schema(**kwargs))
elif target_type == MessageType.YAML:
return cast("MessageInstance[Any]", self.to_yaml(**kwargs))
elif target_type == MessageType.PYDANTIC:
return cast("MessageInstance[Any]", self.to_pydantic(**kwargs))
elif target_type == MessageType.DATACLASS:
return cast("MessageInstance[Any]", self.to_dataclass(**kwargs))
else:
raise ValueError(f"Unsupported target message type: {target_type}")
[docs]
@classmethod
@abstractmethod
def import_from_envelope(cls, data: MessageEnvelope, **kwargs: Any) -> "MessageInstance[Any]":
"""从统一信封字典创建实例(仅接受 data 一个参数)。"""
# metadata会被重置
pass
[docs]
@abstractmethod
def export_to_envelope(self, **kwargs: Any) -> MessageEnvelope:
"""导出为字典格式"""
pass
[docs]
@abstractmethod
def get_python_dict(self) -> Dict[str, Any]:
"""获取当前所有的字段名和对应的python可读值"""
pass
[docs]
@abstractmethod
def set_python_dict(self, value: Dict[str, Any], **kwargs: Any) -> bool:
"""设置所有字段的值"""
pass
[docs]
def get_json_schema(self) -> Dict[str, Any]:
"""生成当前消息实例的JSON Schema,委托给FieldAccessor递归处理"""
# 直接调用FieldAccessor的get_json_schema方法
schema = self.fields.get_json_schema()
# 添加schema元信息(对于JSONSchemaMessageInstance,如果已有title则保持,否则添加默认title)
from msgcenterpy.instances.json_schema_instance import JSONSchemaMessageInstance
if isinstance(self, JSONSchemaMessageInstance):
# 对于JSON Schema实例,如果schema中没有title,则添加一个
if "title" not in schema:
schema["title"] = f"{self.__class__.__name__} Schema" # type: ignore
if "description" not in schema:
schema["description"] = f"JSON Schema generated from {self.message_type.value} message instance" # type: ignore
else:
# 对于其他类型的实例,总是添加schema元信息
schema["title"] = f"{self.__class__.__name__} Schema" # type: ignore
schema["description"] = f"JSON Schema generated from {self.message_type.value} message instance" # type: ignore
return schema
def __repr__(self) -> str:
return f"{self.__class__.__name__}(type={self.message_type.value}, id={self._instance_id[:8]})"
# 便捷转换方法,使用MessageCenter单例
[docs]
def to_ros2(self, type_hint: str | Type[Any], **kwargs: Any) -> "ROS2MessageInstance":
"""转换到ROS2实例。传入必备的类型提示,"""
override_properties = {}
from msgcenterpy.core.message_center import get_message_center
ros2_message_instance = cast(
ROS2MessageInstance,
get_message_center().get_instance_class(MessageType.ROS2),
)
ros_type = ros2_message_instance.obtain_ros_cls_from_str(type_hint)
override_properties["ros_msg_cls_path"] = ROS2MessageInstance.get_ros_msg_cls_path(ros_type)
override_properties["ros_msg_cls_namespace"] = ROS2MessageInstance.get_ros_msg_cls_namespace(ros_type)
return cast(
ROS2MessageInstance,
get_message_center().convert(self, MessageType.ROS2, override_properties, **kwargs),
)
[docs]
def to_json_schema(self, **kwargs: Any) -> "JSONSchemaMessageInstance":
"""转换到JSON Schema实例"""
override_properties = {"json_schema": self.get_json_schema()}
from msgcenterpy.core.message_center import get_message_center
from msgcenterpy.instances.json_schema_instance import JSONSchemaMessageInstance
return cast(
JSONSchemaMessageInstance,
get_message_center().convert(self, MessageType.JSON_SCHEMA, override_properties, **kwargs),
)