Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/microsoft/opentelemetry/_genai/_langchain/_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,9 @@ def _update_span(span: Span, run: Run) -> LLMInvocation | None:
dict(
flatten(
chain(
prompts(run.inputs),
input_messages(run.inputs),
output_messages(run.outputs),
invocation_parameters(run),
function_calls(run.outputs),
metadata(run),
Expand Down
311 changes: 241 additions & 70 deletions src/microsoft/opentelemetry/_genai/_langchain/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,15 +251,28 @@ def input_messages(
return
if not isinstance(inputs, Mapping):
return
if not (multiple_messages := inputs.get("messages")):
return
if not isinstance(multiple_messages, Iterable):
return
if not (first_messages := next(iter(multiple_messages), None)):
return
contents: list[str] = []
if isinstance(first_messages, list):
for message_data in first_messages:
multiple_messages = inputs.get("messages")
if multiple_messages and isinstance(multiple_messages, Iterable):
# LangChain can provide either:
# - nested format: {"messages": [[...]]}
# - flat format: {"messages": [...]} (list of message objects)
if isinstance(multiple_messages, list) and multiple_messages:
first_item = multiple_messages[0]
if isinstance(first_item, list):
message_items: list[Any] = first_item
else:
message_items = multiple_messages
else:
first_messages = next(iter(multiple_messages), None)
if isinstance(first_messages, list):
message_items = first_messages
elif first_messages is not None:
message_items = [first_messages]
else:
message_items = []

for message_data in message_items:
if isinstance(message_data, BaseMessage):
if hasattr(message_data, "content") and message_data.content:
contents.append(str(message_data.content))
Expand All @@ -269,15 +282,17 @@ def input_messages(
elif kwargs := message_data.get("kwargs"):
if hasattr(kwargs, "get") and (content := kwargs.get("content")):
contents.append(str(content))
elif isinstance(first_messages, BaseMessage):
if hasattr(first_messages, "content") and first_messages.content:
contents.append(str(first_messages.content))
elif hasattr(first_messages, "get"):
if content := first_messages.get("content"):
contents.append(str(content))
elif isinstance(first_messages, Sequence) and len(first_messages) == 2:
_role, content = first_messages
contents.append(str(content))
elif isinstance(message_data, Sequence) and len(message_data) == 2:
_role, content = message_data
contents.append(str(content))

# Some providers flatten chat input to prompts for chat_model/llm runs.
if not contents and (prompt_values := inputs.get("prompts")):
if isinstance(prompt_values, list):
contents.extend(str(p) for p in prompt_values if p)
elif isinstance(prompt_values, str):
contents.append(prompt_values)

if contents:
yield GEN_AI_INPUT_MESSAGES_KEY, safe_json_dumps(contents)

Expand Down Expand Up @@ -313,12 +328,28 @@ def output_messages(
return
if not isinstance(multiple_generations, Iterable):
return
if not (first_generations := next(iter(multiple_generations), None)):
return
if not isinstance(first_generations, Iterable):
return

if isinstance(multiple_generations, list) and multiple_generations:
first_item = multiple_generations[0]
if isinstance(first_item, list):
generation_items: list[Any] = first_item
elif isinstance(first_item, Mapping):
generation_items = multiple_generations
else:
generation_items = []
else:
first_generations = next(iter(multiple_generations), None)
if first_generations is None:
return
if isinstance(first_generations, list):
generation_items = first_generations
elif isinstance(first_generations, Mapping):
generation_items = [first_generations]
else:
return

contents: list[str] = []
for generation in first_generations:
for generation in generation_items:
if not isinstance(generation, Mapping):
continue
if message_data := generation.get("message"):
Expand Down Expand Up @@ -392,65 +423,205 @@ def model_name(
return


def _as_non_negative_int(value: Any) -> int | None:
if isinstance(value, bool):
return None
try:
parsed = int(value)
except (TypeError, ValueError):
return None
return parsed if parsed >= 0 else None


def _as_usage_mapping(raw_usage: Any) -> Mapping[str, Any] | None:
if isinstance(raw_usage, Mapping):
return raw_usage

if callable(model_dump := getattr(raw_usage, "model_dump", None)):
try:
dumped = model_dump(exclude_none=True)
except TypeError:
dumped = model_dump()
if isinstance(dumped, Mapping):
return dumped

if callable(dict_method := getattr(raw_usage, "dict", None)):
try:
dumped = dict_method(exclude_none=True)
except TypeError:
dumped = dict_method()
if isinstance(dumped, Mapping):
return dumped

usage: dict[str, Any] = {}
for scalar_key in (
"prompt_tokens",
"input_tokens",
"prompt_token_count",
"completion_tokens",
"output_tokens",
"candidates_token_count",
"total_tokens",
"total_token_count",
):
scalar_value = getattr(raw_usage, scalar_key, None)
if scalar_value is not None:
usage[scalar_key] = scalar_value

for nested_key in (
"input_token_details",
"output_token_details",
"prompt_tokens_details",
"completion_tokens_details",
):
nested_value = getattr(raw_usage, nested_key, None)
if nested_value is None:
continue
if nested_mapping := _as_usage_mapping(nested_value):
usage[nested_key] = nested_mapping

return usage or None


def _normalized_token_usage(outputs: Mapping[str, Any] | None) -> dict[str, int]:
if not (token_usage := _parse_token_usage(outputs)):
return {}
input_count = _as_non_negative_int(
get_first_value(token_usage, ("prompt_tokens", "input_tokens", "prompt_token_count"))
)
output_count = _as_non_negative_int(
get_first_value(token_usage, ("completion_tokens", "output_tokens", "candidates_token_count"))
)
total_count = _as_non_negative_int(get_first_value(token_usage, ("total_tokens", "total_token_count")))
if total_count is None and input_count is not None and output_count is not None:
total_count = input_count + output_count

usage: dict[str, int] = {}
if input_count is not None:
usage["input_tokens"] = input_count
if output_count is not None:
usage["output_tokens"] = output_count
if total_count is not None:
# Kept internal for parity/future metrics, not emitted as a semconv span attribute.
usage["total_tokens"] = total_count
return usage


@stop_on_exception
def token_counts(outputs: Mapping[str, Any] | None) -> Iterator[tuple[str, int]]:
if not (token_usage := _parse_token_usage(outputs)):
return
for attribute_name, keys in [
(GEN_AI_USAGE_INPUT_TOKENS_KEY, ("prompt_tokens", "input_tokens", "prompt_token_count")),
(GEN_AI_USAGE_OUTPUT_TOKENS_KEY, ("completion_tokens", "output_tokens", "candidates_token_count")),
]:
if (token_count := get_first_value(token_usage, keys)) is not None:
yield attribute_name, token_count
# langchain_core UsageMetadata
for attribute_name, details_key, keys in [ # type: ignore[assignment]
(GEN_AI_USAGE_INPUT_TOKENS_KEY, None, ("input_tokens",)),
(GEN_AI_USAGE_OUTPUT_TOKENS_KEY, None, ("output_tokens",)),
]:
details = token_usage.get(details_key) if details_key else token_usage
if details is not None:
if (token_count := get_first_value(details, keys)) is not None:
yield attribute_name, token_count
usage = _normalized_token_usage(outputs)
if (input_tokens := usage.get("input_tokens")) is not None:
yield GEN_AI_USAGE_INPUT_TOKENS_KEY, input_tokens
if (output_tokens := usage.get("output_tokens")) is not None:
yield GEN_AI_USAGE_OUTPUT_TOKENS_KEY, output_tokens


def _iter_generation_response_metadata(outputs: Mapping[str, Any] | None) -> Iterator[Mapping[str, Any]]:
"""Yield ``response_metadata`` / ``generation_info`` mappings on each generation."""
def _iter_generation_mappings(outputs: Mapping[str, Any] | None) -> Iterator[Mapping[str, Any]]:
"""Yield generation mappings from both nested and flat generations payloads."""
if not isinstance(outputs, Mapping):
return
multiple_generations = outputs.get("generations")
if not isinstance(multiple_generations, Iterable):
generations = outputs.get("generations")
if not isinstance(generations, Iterable):
return
for first_generations in multiple_generations:
if not isinstance(first_generations, Iterable):
continue

if isinstance(generations, list):
if not generations:
return
first_item = generations[0]
# Nested shape: generations = [[{...}, {...}], ...]
if isinstance(first_item, list):
for generation in first_item:
if isinstance(generation, Mapping):
yield generation
return
# Flat shape: generations = [{...}, {...}]
if isinstance(first_item, Mapping):
for generation in generations:
if isinstance(generation, Mapping):
yield generation
return

# Generic iterable fallback
first_generations = next(iter(generations), None)
if first_generations is None:
return
if isinstance(first_generations, Mapping):
yield first_generations
return
if isinstance(first_generations, Iterable):
for generation in first_generations:
if not isinstance(generation, Mapping):
continue
gen_info = generation.get("generation_info")
if isinstance(gen_info, Mapping):
yield gen_info
message_data = generation.get("message")
if isinstance(message_data, BaseMessage):
meta = getattr(message_data, "response_metadata", None)
elif isinstance(message_data, Mapping):
meta = message_data.get("response_metadata")
if meta is None and isinstance(kwargs := message_data.get("kwargs"), Mapping):
meta = kwargs.get("response_metadata")
else:
meta = None
if isinstance(meta, Mapping):
yield meta
if isinstance(generation, Mapping):
yield generation
return


def _iter_generation_response_metadata(outputs: Mapping[str, Any] | None) -> Iterator[Mapping[str, Any]]:
"""Yield ``response_metadata`` / ``generation_info`` mappings on each generation."""
for generation in _iter_generation_mappings(outputs):
gen_info = generation.get("generation_info")
if isinstance(gen_info, Mapping):
yield gen_info
message_data = generation.get("message")
if isinstance(message_data, BaseMessage):
meta = getattr(message_data, "response_metadata", None)
elif isinstance(message_data, Mapping):
meta = message_data.get("response_metadata")
if meta is None and isinstance(kwargs := message_data.get("kwargs"), Mapping):
meta = kwargs.get("response_metadata")
else:
meta = None
if isinstance(meta, Mapping):
yield meta


def _parse_token_usage(outputs: Mapping[str, Any] | None) -> Any:
if (
outputs
and hasattr(outputs, "get")
and (llm_output := outputs.get("llm_output"))
and hasattr(llm_output, "get")
and (token_usage := get_first_value(llm_output, ("token_usage", "usage")))
):
return token_usage
if outputs and hasattr(outputs, "get") and (llm_output := outputs.get("llm_output")) and hasattr(llm_output, "get"):
if usage := get_first_value(llm_output, ("token_usage", "usage")):
if usage_mapping := _as_usage_mapping(usage):
return usage_mapping

for generation in _iter_generation_mappings(outputs):
for usage_candidate in (
generation.get("token_usage"),
generation.get("usage"),
generation.get("usage_metadata"),
):
if usage_candidate and (usage_mapping := _as_usage_mapping(usage_candidate)):
return usage_mapping

if isinstance(generation_info := generation.get("generation_info"), Mapping):
for usage_candidate in (
generation_info.get("token_usage"),
generation_info.get("usage"),
generation_info,
):
if usage_candidate and (usage_mapping := _as_usage_mapping(usage_candidate)):
return usage_mapping

message_data = generation.get("message")
if isinstance(message_data, BaseMessage):
for usage_candidate in (
getattr(message_data, "usage_metadata", None),
get_first_value(getattr(message_data, "response_metadata", {}), ("token_usage", "usage")),
getattr(message_data, "response_metadata", None),
):
if usage_candidate and (usage_mapping := _as_usage_mapping(usage_candidate)):
return usage_mapping
elif isinstance(message_data, Mapping):
kwargs = message_data.get("kwargs") if isinstance(message_data.get("kwargs"), Mapping) else {}
response_meta = message_data.get("response_metadata")
if not isinstance(response_meta, Mapping):
response_meta = kwargs.get("response_metadata") if isinstance(kwargs, Mapping) else None

for usage_candidate in (
message_data.get("usage_metadata"),
kwargs.get("usage_metadata") if isinstance(kwargs, Mapping) else None,
get_first_value(response_meta, ("token_usage", "usage")) if isinstance(response_meta, Mapping) else None,
response_meta,
):
if usage_candidate and (usage_mapping := _as_usage_mapping(usage_candidate)):
return usage_mapping

return None


Expand Down
Loading
Loading