Skip to content

reading_client can not get core-data data #24

@yqhsmile

Description

@yqhsmile

get_readings_by_device_and_resource:can get core-data data
get_readings_by_device_and_resource_by_client:can not get core-data data

level=ERROR ts=2026-05-27 13:20:54,458 app=app-collaborative-control source=logger.py:155 msg=error processing message in pipeline default-pipeline for envelope f6f14972-73e4-4cbe-a20f-339faaa73583: failed to parse the response body -> 'id'

`
import json
import uuid
import requests
from typing import Optional, Dict, Any, Tuple
from app_functions_sdk_py.contracts import errors
from app_functions_sdk_py.contracts.clients.utils.common import convert_any_to_dict
from app_functions_sdk_py.contracts.dtos.event import Event
from app_functions_sdk_py.interfaces import AppFunctionContext

class CoreDataGetter:
"""EdgeX 数据获取器"""

def __init__(self, reading_client, base_url: str = "http://localhost:59880"):
    """
    初始化数据获取器
    
    :param base_url: EdgeX Core Data 服务地址
    """
    self.reading_client = reading_client
    self.base_url = base_url

def get_readings(self, ctx: AppFunctionContext, data: Any) -> Tuple[bool, Any]:
    """
    获取读数
    
    :param ctx: 应用函数上下文
    :param data: 输入数据(包含设备名称和资源名称)
    :return: 包含读数数据的字典
    """
    if data is None:
        return False, errors.new_common_edgex(errors.ErrKind.CONTRACT_INVALID, "CoreDataGetter: No Data Received")

    if isinstance(data, Event) is False:
        return False, errors.new_common_edgex(
            errors.ErrKind.CONTRACT_INVALID,
            f"CoreDataGetter: type received is not an Event " f"in pipeline '{ctx.pipeline_id()}'",
        )
    
    event_id = data.id
    ctx.logger().info(f"CoreDataGetter: log event_id: '{event_id}'")

    # readings = self.get_readings_by_device_and_resource(
    #     ctx,
    #     device_name="CoalFlowMeter1",
    #     resource_name="FlowRate1",
    #     offset=0,
    #     limit=20,
    #     start=None,
    #     end=None
    # )
    # ctx.logger().info(f"CoreDataGetter: log readings: {readings}")
    ctx.logger().info(f"################################################")

    import time
    current_time = time.time()
    query_ns = int(10 * 60 * 1e9)
    start_time = int(current_time * 1000000000 - query_ns)
    end_time = int(current_time * 1000000000 )

    readings_by_client = self.get_readings_by_device_and_resource_by_client(
        ctx,
        device_name="CoalFlowMeter1",
        resource_name="FlowRate1",
        offset=0,
        limit=20,
        start=start_time,
        end=end_time
    )
    ctx.logger().info(f"CoreDataGetter: log readings_by_client: {readings_by_client}")
    ctx.logger().info(f"################################################")


    return True, data

def get_readings_by_device_and_resource_by_client(
    self, 
    ctx: AppFunctionContext,
    device_name: str, 
    resource_name: str,
    offset: Optional[int] = None,
    limit: Optional[int] = None,
    start: Optional[int] = None,
    end: Optional[int] = None
) -> Dict[str, Any]:

    context_dict = {
        'X-Correlation-ID': str(uuid.uuid4()),
        'correlation-id': str(uuid.uuid4()) 
    }

    data = self.reading_client.readings_by_device_name_and_resource_name_and_time_range(
        ctx=context_dict,
        device_name=device_name,
        resource_name=resource_name,
        offset=offset,
        limit=limit,
        start=start,
        end=end
    )

    # ctx.logger().info(f"CoreDataGetter: log data: {data}")
    return data



def get_readings_by_device_and_resource(
    self, 
    ctx: AppFunctionContext,
    device_name: str, 
    resource_name: str,
    offset: Optional[int] = None,
    limit: Optional[int] = None,
    start: Optional[int] = None,
    end: Optional[int] = None
) -> Dict[str, Any]:
    """
    通过设备名称和资源名称获取读数
    
    :param device_name: 设备名称(如 CoalFlowMeter1)
    :param resource_name: 资源名称(如 FlowRate1)
    :param offset: 偏移量(用于分页)
    :param limit: 返回数据条数限制
    :param start: 开始时间戳(纳秒)
    :param end: 结束时间戳(纳秒)
    :return: 包含读数数据的字典
    """
    try:
        url = f"{self.base_url}/api/v3/reading/device/name/{device_name}/resourceName/{resource_name}"
        
        params = {}
        if limit:
            params["limit"] = limit
        if start:
            params["start"] = start
        if end:
            params["end"] = end
        if offset:
            params["offset"] = offset
        
        response = requests.get(url, params=params)
        response.raise_for_status()

        # ctx.logger().info(f"CoreDataGetter: log response: {response.json()}")
        
        return {
            "success": True,
            "status_code": response.status_code,
            "device_name": device_name,
            "resource_name": resource_name,
            "data": response.json()
        }
    
    except requests.exceptions.RequestException as e:
        return {
            "success": False,
            "error": str(e),
            "device_name": device_name,
            "resource_name": resource_name
        }
`

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workinghelp wantedExtra attention is needed

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions