import tempfile
from copy import deepcopy
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import Generation, LLMResult
import langchain_community
from langchain_community.callbacks.utils import (
BaseMetadataCallbackHandler,
flatten_dict,
import_pandas,
import_spacy,
import_textstat,
)
LANGCHAIN_MODEL_NAME = "langchain-model"
[docs]def import_comet_ml() -> Any:
"""Import comet_ml and raise an error if it is not installed."""
try:
import comet_ml # noqa: F401
except ImportError:
raise ImportError(
"To use the comet_ml callback manager you need to have the "
"`comet_ml` python package installed. Please install it with"
" `pip install comet_ml`"
)
return comet_ml
def _get_experiment(
workspace: Optional[str] = None, project_name: Optional[str] = None
) -> Any:
comet_ml = import_comet_ml()
experiment = comet_ml.Experiment(
workspace=workspace,
project_name=project_name,
)
return experiment
def _fetch_text_complexity_metrics(text: str) -> dict:
textstat = import_textstat()
text_complexity_metrics = {
"flesch_reading_ease": textstat.flesch_reading_ease(text),
"flesch_kincaid_grade": textstat.flesch_kincaid_grade(text),
"smog_index": textstat.smog_index(text),
"coleman_liau_index": textstat.coleman_liau_index(text),
"automated_readability_index": textstat.automated_readability_index(text),
"dale_chall_readability_score": textstat.dale_chall_readability_score(text),
"difficult_words": textstat.difficult_words(text),
"linsear_write_formula": textstat.linsear_write_formula(text),
"gunning_fog": textstat.gunning_fog(text),
"text_standard": textstat.text_standard(text),
"fernandez_huerta": textstat.fernandez_huerta(text),
"szigriszt_pazos": textstat.szigriszt_pazos(text),
"gutierrez_polini": textstat.gutierrez_polini(text),
"crawford": textstat.crawford(text),
"gulpease_index": textstat.gulpease_index(text),
"osman": textstat.osman(text),
}
return text_complexity_metrics
def _summarize_metrics_for_generated_outputs(metrics: Sequence) -> dict:
pd = import_pandas()
metrics_df = pd.DataFrame(metrics)
metrics_summary = metrics_df.describe()
return metrics_summary.to_dict()
[docs]class CometCallbackHandler(BaseMetadataCallbackHandler, BaseCallbackHandler):
"""Callback Handler that logs to Comet.
Parameters:
job_type (str): The type of comet_ml task such as "inference",
"testing" or "qc"
project_name (str): The comet_ml project name
tags (list): Tags to add to the task
task_name (str): Name of the comet_ml task
visualize (bool): Whether to visualize the run.
complexity_metrics (bool): Whether to log complexity metrics
stream_logs (bool): Whether to stream callback actions to Comet
This handler will utilize the associated callback method and formats
the input of each callback function with metadata regarding the state of LLM run,
and adds the response to the list of records for both the {method}_records and
action. It then logs the response to Comet.
"""
[docs] def __init__(
self,
task_type: Optional[str] = "inference",
workspace: Optional[str] = None,
project_name: Optional[str] = None,
tags: Optional[Sequence] = None,
name: Optional[str] = None,
visualizations: Optional[List[str]] = None,
complexity_metrics: bool = False,
custom_metrics: Optional[Callable] = None,
stream_logs: bool = True,
) -> None:
"""Initialize callback handler."""
self.comet_ml = import_comet_ml()
super().__init__()
self.task_type = task_type
self.workspace = workspace
self.project_name = project_name
self.tags = tags
self.visualizations = visualizations
self.complexity_metrics = complexity_metrics
self.custom_metrics = custom_metrics
self.stream_logs = stream_logs
self.temp_dir = tempfile.TemporaryDirectory()
self.experiment = _get_experiment(workspace, project_name)
self.experiment.log_other("Created from", "langchain")
if tags:
self.experiment.add_tags(tags)
self.name = name
if self.name:
self.experiment.set_name(self.name)
warning = (
"The comet_ml callback is currently in beta and is subject to change "
"based on updates to `langchain`. Please report any issues to "
"https://github.com/comet-ml/issue-tracking/issues with the tag "
"`langchain`."
)
self.comet_ml.LOGGER.warning(warning)
self.callback_columns: list = []
self.action_records: list = []
self.complexity_metrics = complexity_metrics
if self.visualizations:
spacy = import_spacy()
self.nlp = spacy.load("en_core_web_sm")
else:
self.nlp = None
def _init_resp(self) -> Dict:
return {k: None for k in self.callback_columns}
[docs] def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
"""Run when LLM starts."""
self.step += 1
self.llm_starts += 1
self.starts += 1
metadata = self._init_resp()
metadata.update({"action": "on_llm_start"})
metadata.update(flatten_dict(serialized))
metadata.update(self.get_custom_callback_meta())
for prompt in prompts:
prompt_resp = deepcopy(metadata)
prompt_resp["prompts"] = prompt
self.on_llm_start_records.append(prompt_resp)
self.action_records.append(prompt_resp)
if self.stream_logs:
self._log_stream(prompt, metadata, self.step)
[docs] def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
"""Run when LLM generates a new token."""
self.step += 1
self.llm_streams += 1
resp = self._init_resp()
resp.update({"action": "on_llm_new_token", "token": token})
resp.update(self.get_custom_callback_meta())
self.action_records.append(resp)
[docs] def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""Run when LLM ends running."""
self.step += 1
self.llm_ends += 1
self.ends += 1
metadata = self._init_resp()
metadata.update({"action": "on_llm_end"})
metadata.update(flatten_dict(response.llm_output or {}))
metadata.update(self.get_custom_callback_meta())
output_complexity_metrics = []
output_custom_metrics = []
for prompt_idx, generations in enumerate(response.generations):
for gen_idx, generation in enumerate(generations):
text = generation.text
generation_resp = deepcopy(metadata)
generation_resp.update(flatten_dict(generation.dict()))
complexity_metrics = self._get_complexity_metrics(text)
if complexity_metrics:
output_complexity_metrics.append(complexity_metrics)
generation_resp.update(complexity_metrics)
custom_metrics = self._get_custom_metrics(
generation, prompt_idx, gen_idx
)
if custom_metrics:
output_custom_metrics.append(custom_metrics)
generation_resp.update(custom_metrics)
if self.stream_logs:
self._log_stream(text, metadata, self.step)
self.action_records.append(generation_resp)
self.on_llm_end_records.append(generation_resp)
self._log_text_metrics(output_complexity_metrics, step=self.step)
self._log_text_metrics(output_custom_metrics, step=self.step)
[docs] def on_llm_error(self, error: BaseException, **kwargs: Any) -> None:
"""Run when LLM errors."""
self.step += 1
self.errors += 1
[docs] def on_chain_start(
self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any
) -> None:
"""Run when chain starts running."""
self.step += 1
self.chain_starts += 1
self.starts += 1
resp = self._init_resp()
resp.update({"action": "on_chain_start"})
resp.update(flatten_dict(serialized))
resp.update(self.get_custom_callback_meta())
for chain_input_key, chain_input_val in inputs.items():
if isinstance(chain_input_val, str):
input_resp = deepcopy(resp)
if self.stream_logs:
self._log_stream(chain_input_val, resp, self.step)
input_resp.update({chain_input_key: chain_input_val})
self.action_records.append(input_resp)
else:
self.comet_ml.LOGGER.warning(
f"Unexpected data format provided! "
f"Input Value for {chain_input_key} will not be logged"
)
[docs] def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
"""Run when chain ends running."""
self.step += 1
self.chain_ends += 1
self.ends += 1
resp = self._init_resp()
resp.update({"action": "on_chain_end"})
resp.update(self.get_custom_callback_meta())
for chain_output_key, chain_output_val in outputs.items():
if isinstance(chain_output_val, str):
output_resp = deepcopy(resp)
if self.stream_logs:
self._log_stream(chain_output_val, resp, self.step)
output_resp.update({chain_output_key: chain_output_val})
self.action_records.append(output_resp)
else:
self.comet_ml.LOGGER.warning(
f"Unexpected data format provided! "
f"Output Value for {chain_output_key} will not be logged"
)
[docs] def on_chain_error(self, error: BaseException, **kwargs: Any) -> None:
"""Run when chain errors."""
self.step += 1
self.errors += 1
[docs] def on_text(self, text: str, **kwargs: Any) -> None:
"""
Run when agent is ending.
"""
self.step += 1
self.text_ctr += 1
resp = self._init_resp()
resp.update({"action": "on_text"})
resp.update(self.get_custom_callback_meta())
if self.stream_logs:
self._log_stream(text, resp, self.step)
resp.update({"text": text})
self.action_records.append(resp)
[docs] def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> None:
"""Run when agent ends running."""
self.step += 1
self.agent_ends += 1
self.ends += 1
resp = self._init_resp()
output = finish.return_values["output"]
log = finish.log
resp.update({"action": "on_agent_finish", "log": log})
resp.update(self.get_custom_callback_meta())
if self.stream_logs:
self._log_stream(output, resp, self.step)
resp.update({"output": output})
self.action_records.append(resp)
[docs] def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any:
"""Run on agent action."""
self.step += 1
self.tool_starts += 1
self.starts += 1
tool = action.tool
tool_input = str(action.tool_input)
log = action.log
resp = self._init_resp()
resp.update({"action": "on_agent_action", "log": log, "tool": tool})
resp.update(self.get_custom_callback_meta())
if self.stream_logs:
self._log_stream(tool_input, resp, self.step)
resp.update({"tool_input": tool_input})
self.action_records.append(resp)
def _get_complexity_metrics(self, text: str) -> dict:
"""Compute text complexity metrics using textstat.
Parameters:
text (str): The text to analyze.
Returns:
(dict): A dictionary containing the complexity metrics.
"""
resp = {}
if self.complexity_metrics:
text_complexity_metrics = _fetch_text_complexity_metrics(text)
resp.update(text_complexity_metrics)
return resp
def _get_custom_metrics(
self, generation: Generation, prompt_idx: int, gen_idx: int
) -> dict:
"""Compute Custom Metrics for an LLM Generated Output
Args:
generation (LLMResult): Output generation from an LLM
prompt_idx (int): List index of the input prompt
gen_idx (int): List index of the generated output
Returns:
dict: A dictionary containing the custom metrics.
"""
resp = {}
if self.custom_metrics:
custom_metrics = self.custom_metrics(generation, prompt_idx, gen_idx)
resp.update(custom_metrics)
return resp
[docs] def flush_tracker(
self,
langchain_asset: Any = None,
task_type: Optional[str] = "inference",
workspace: Optional[str] = None,
project_name: Optional[str] = "comet-langchain-demo",
tags: Optional[Sequence] = None,
name: Optional[str] = None,
visualizations: Optional[List[str]] = None,
complexity_metrics: bool = False,
custom_metrics: Optional[Callable] = None,
finish: bool = False,
reset: bool = False,
) -> None:
"""Flush the tracker and setup the session.
Everything after this will be a new table.
Args:
name: Name of the performed session so far so it is identifiable
langchain_asset: The langchain asset to save.
finish: Whether to finish the run.
Returns:
None
"""
self._log_session(langchain_asset)
if langchain_asset:
try:
self._log_model(langchain_asset)
except Exception:
self.comet_ml.LOGGER.error(
"Failed to export agent or LLM to Comet",
exc_info=True,
extra={"show_traceback": True},
)
if finish:
self.experiment.end()
if reset:
self._reset(
task_type,
workspace,
project_name,
tags,
name,
visualizations,
complexity_metrics,
custom_metrics,
)
def _log_stream(self, prompt: str, metadata: dict, step: int) -> None:
self.experiment.log_text(prompt, metadata=metadata, step=step)
def _log_model(self, langchain_asset: Any) -> None:
model_parameters = self._get_llm_parameters(langchain_asset)
self.experiment.log_parameters(model_parameters, prefix="model")
langchain_asset_path = Path(self.temp_dir.name, "model.json")
model_name = self.name if self.name else LANGCHAIN_MODEL_NAME
try:
if hasattr(langchain_asset, "save"):
langchain_asset.save(langchain_asset_path)
self.experiment.log_model(model_name, str(langchain_asset_path))
except (ValueError, AttributeError, NotImplementedError) as e:
if hasattr(langchain_asset, "save_agent"):
langchain_asset.save_agent(langchain_asset_path)
self.experiment.log_model(model_name, str(langchain_asset_path))
else:
self.comet_ml.LOGGER.error(
f"{e}"
" Could not save Langchain Asset "
f"for {langchain_asset.__class__.__name__}"
)
def _log_session(self, langchain_asset: Optional[Any] = None) -> None:
try:
llm_session_df = self._create_session_analysis_dataframe(langchain_asset)
# Log the cleaned dataframe as a table
self.experiment.log_table("langchain-llm-session.csv", llm_session_df)
except Exception:
self.comet_ml.LOGGER.warning(
"Failed to log session data to Comet",
exc_info=True,
extra={"show_traceback": True},
)
try:
metadata = {"langchain_version": str(langchain_community.__version__)}
# Log the langchain low-level records as a JSON file directly
self.experiment.log_asset_data(
self.action_records, "langchain-action_records.json", metadata=metadata
)
except Exception:
self.comet_ml.LOGGER.warning(
"Failed to log session data to Comet",
exc_info=True,
extra={"show_traceback": True},
)
try:
self._log_visualizations(llm_session_df)
except Exception:
self.comet_ml.LOGGER.warning(
"Failed to log visualizations to Comet",
exc_info=True,
extra={"show_traceback": True},
)
def _log_text_metrics(self, metrics: Sequence[dict], step: int) -> None:
if not metrics:
return
metrics_summary = _summarize_metrics_for_generated_outputs(metrics)
for key, value in metrics_summary.items():
self.experiment.log_metrics(value, prefix=key, step=step)
def _log_visualizations(self, session_df: Any) -> None:
if not (self.visualizations and self.nlp):
return
spacy = import_spacy()
prompts = session_df["prompts"].tolist()
outputs = session_df["text"].tolist()
for idx, (prompt, output) in enumerate(zip(prompts, outputs)):
doc = self.nlp(output)
sentence_spans = list(doc.sents)
for visualization in self.visualizations:
try:
html = spacy.displacy.render(
sentence_spans,
style=visualization,
options={"compact": True},
jupyter=False,
page=True,
)
self.experiment.log_asset_data(
html,
name=f"langchain-viz-{visualization}-{idx}.html",
metadata={"prompt": prompt},
step=idx,
)
except Exception as e:
self.comet_ml.LOGGER.warning(
e, exc_info=True, extra={"show_traceback": True}
)
return
def _reset(
self,
task_type: Optional[str] = None,
workspace: Optional[str] = None,
project_name: Optional[str] = None,
tags: Optional[Sequence] = None,
name: Optional[str] = None,
visualizations: Optional[List[str]] = None,
complexity_metrics: bool = False,
custom_metrics: Optional[Callable] = None,
) -> None:
_task_type = task_type if task_type else self.task_type
_workspace = workspace if workspace else self.workspace
_project_name = project_name if project_name else self.project_name
_tags = tags if tags else self.tags
_name = name if name else self.name
_visualizations = visualizations if visualizations else self.visualizations
_complexity_metrics = (
complexity_metrics if complexity_metrics else self.complexity_metrics
)
_custom_metrics = custom_metrics if custom_metrics else self.custom_metrics
self.__init__( # type: ignore
task_type=_task_type,
workspace=_workspace,
project_name=_project_name,
tags=_tags,
name=_name,
visualizations=_visualizations,
complexity_metrics=_complexity_metrics,
custom_metrics=_custom_metrics,
)
self.reset_callback_meta()
self.temp_dir = tempfile.TemporaryDirectory()
def _create_session_analysis_dataframe(self, langchain_asset: Any = None) -> dict:
pd = import_pandas()
llm_parameters = self._get_llm_parameters(langchain_asset)
num_generations_per_prompt = llm_parameters.get("n", 1)
llm_start_records_df = pd.DataFrame(self.on_llm_start_records)
# Repeat each input row based on the number of outputs generated per prompt
llm_start_records_df = llm_start_records_df.loc[
llm_start_records_df.index.repeat(num_generations_per_prompt)
].reset_index(drop=True)
llm_end_records_df = pd.DataFrame(self.on_llm_end_records)
llm_session_df = pd.merge(
llm_start_records_df,
llm_end_records_df,
left_index=True,
right_index=True,
suffixes=["_llm_start", "_llm_end"],
)
return llm_session_df
def _get_llm_parameters(self, langchain_asset: Any = None) -> dict:
if not langchain_asset:
return {}
try:
if hasattr(langchain_asset, "agent"):
llm_parameters = langchain_asset.agent.llm_chain.llm.dict()
elif hasattr(langchain_asset, "llm_chain"):
llm_parameters = langchain_asset.llm_chain.llm.dict()
elif hasattr(langchain_asset, "llm"):
llm_parameters = langchain_asset.llm.dict()
else:
llm_parameters = langchain_asset.dict()
except Exception:
return {}
return llm_parameters