def __init__(self, reading_client, base_url: str = "http://localhost:59880"):
"""
初始化数据获取器
:param base_url: EdgeX Core Data 服务地址
"""
self.reading_client = reading_client
self.base_url = base_url
def get_readings(self, ctx: AppFunctionContext, data: Any) -> Tuple[bool, Any]:
"""
获取读数
:param ctx: 应用函数上下文
:param data: 输入数据(包含设备名称和资源名称)
:return: 包含读数数据的字典
"""
if data is None:
return False, errors.new_common_edgex(errors.ErrKind.CONTRACT_INVALID, "CoreDataGetter: No Data Received")
if isinstance(data, Event) is False:
return False, errors.new_common_edgex(
errors.ErrKind.CONTRACT_INVALID,
f"CoreDataGetter: type received is not an Event " f"in pipeline '{ctx.pipeline_id()}'",
)
event_id = data.id
ctx.logger().info(f"CoreDataGetter: log event_id: '{event_id}'")
# readings = self.get_readings_by_device_and_resource(
# ctx,
# device_name="CoalFlowMeter1",
# resource_name="FlowRate1",
# offset=0,
# limit=20,
# start=None,
# end=None
# )
# ctx.logger().info(f"CoreDataGetter: log readings: {readings}")
ctx.logger().info(f"################################################")
import time
current_time = time.time()
query_ns = int(10 * 60 * 1e9)
start_time = int(current_time * 1000000000 - query_ns)
end_time = int(current_time * 1000000000 )
readings_by_client = self.get_readings_by_device_and_resource_by_client(
ctx,
device_name="CoalFlowMeter1",
resource_name="FlowRate1",
offset=0,
limit=20,
start=start_time,
end=end_time
)
ctx.logger().info(f"CoreDataGetter: log readings_by_client: {readings_by_client}")
ctx.logger().info(f"################################################")
return True, data
def get_readings_by_device_and_resource_by_client(
self,
ctx: AppFunctionContext,
device_name: str,
resource_name: str,
offset: Optional[int] = None,
limit: Optional[int] = None,
start: Optional[int] = None,
end: Optional[int] = None
) -> Dict[str, Any]:
context_dict = {
'X-Correlation-ID': str(uuid.uuid4()),
'correlation-id': str(uuid.uuid4())
}
data = self.reading_client.readings_by_device_name_and_resource_name_and_time_range(
ctx=context_dict,
device_name=device_name,
resource_name=resource_name,
offset=offset,
limit=limit,
start=start,
end=end
)
# ctx.logger().info(f"CoreDataGetter: log data: {data}")
return data
def get_readings_by_device_and_resource(
self,
ctx: AppFunctionContext,
device_name: str,
resource_name: str,
offset: Optional[int] = None,
limit: Optional[int] = None,
start: Optional[int] = None,
end: Optional[int] = None
) -> Dict[str, Any]:
"""
通过设备名称和资源名称获取读数
:param device_name: 设备名称(如 CoalFlowMeter1)
:param resource_name: 资源名称(如 FlowRate1)
:param offset: 偏移量(用于分页)
:param limit: 返回数据条数限制
:param start: 开始时间戳(纳秒)
:param end: 结束时间戳(纳秒)
:return: 包含读数数据的字典
"""
try:
url = f"{self.base_url}/api/v3/reading/device/name/{device_name}/resourceName/{resource_name}"
params = {}
if limit:
params["limit"] = limit
if start:
params["start"] = start
if end:
params["end"] = end
if offset:
params["offset"] = offset
response = requests.get(url, params=params)
response.raise_for_status()
# ctx.logger().info(f"CoreDataGetter: log response: {response.json()}")
return {
"success": True,
"status_code": response.status_code,
"device_name": device_name,
"resource_name": resource_name,
"data": response.json()
}
except requests.exceptions.RequestException as e:
return {
"success": False,
"error": str(e),
"device_name": device_name,
"resource_name": resource_name
}
`
get_readings_by_device_and_resource:can get core-data data
get_readings_by_device_and_resource_by_client:can not get core-data data
level=ERROR ts=2026-05-27 13:20:54,458 app=app-collaborative-control source=logger.py:155 msg=error processing message in pipeline default-pipeline for envelope f6f14972-73e4-4cbe-a20f-339faaa73583: failed to parse the response body -> 'id'`
import json
import uuid
import requests
from typing import Optional, Dict, Any, Tuple
from app_functions_sdk_py.contracts import errors
from app_functions_sdk_py.contracts.clients.utils.common import convert_any_to_dict
from app_functions_sdk_py.contracts.dtos.event import Event
from app_functions_sdk_py.interfaces import AppFunctionContext
class CoreDataGetter:
"""EdgeX 数据获取器"""