Source code for langchain_community.callbacks.fiddler_callback

import time
from typing import Any, Dict, List

from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import LLMResult

from langchain_community.callbacks.utils import import_pandas

# Define constants

# LLMResult keys
TOKEN_USAGE = "token_usage"
TOTAL_TOKENS = "total_tokens"
PROMPT_TOKENS = "prompt_tokens"
COMPLETION_TOKENS = "completion_tokens"
RUN_ID = "run_id"
MODEL_NAME = "model_name"

# Default values
DEFAULT_MAX_TOKEN = 65536
DEFAULT_MAX_DURATION = 120

# Fiddler specific constants
PROMPT = "prompt"
RESPONSE = "response"
DURATION = "duration"

# Define a dataset dictionary
_dataset_dict = {
    PROMPT: ["fiddler"] * 10,
    RESPONSE: ["fiddler"] * 10,
    MODEL_NAME: ["fiddler"] * 10,
    RUN_ID: ["123e4567-e89b-12d3-a456-426614174000"] * 10,
    TOTAL_TOKENS: [0, DEFAULT_MAX_TOKEN] * 5,
    PROMPT_TOKENS: [0, DEFAULT_MAX_TOKEN] * 5,
    COMPLETION_TOKENS: [0, DEFAULT_MAX_TOKEN] * 5,
    DURATION: [1, DEFAULT_MAX_DURATION] * 5,
}


[docs]def import_fiddler() -> Any: """Import the fiddler python package and raise an error if it is not installed.""" try: import fiddler # noqa: F401 except ImportError: raise ImportError( "To use fiddler callback handler you need to have `fiddler-client`" "package installed. Please install it with `pip install fiddler-client`" ) return fiddler
# First, define custom callback handler implementations
[docs]class FiddlerCallbackHandler(BaseCallbackHandler):
[docs] def __init__( self, url: str, org: str, project: str, model: str, api_key: str, ) -> None: """ Initialize Fiddler callback handler. Args: url: Fiddler URL (e.g. https://demo.fiddler.ai). Make sure to include the protocol (http/https). org: Fiddler organization id project: Fiddler project name to publish events to model: Fiddler model name to publish events to api_key: Fiddler authentication token """ super().__init__() # Initialize Fiddler client and other necessary properties self.fdl = import_fiddler() self.pd = import_pandas() self.url = url self.org = org self.project = project self.model = model self.api_key = api_key self._df = self.pd.DataFrame(_dataset_dict) self.run_id_prompts: Dict[str, List[str]] = {} self.run_id_starttime: Dict[str, int] = {} # Initialize Fiddler client here self.fiddler_client = self.fdl.FiddlerApi(url, org_id=org, auth_token=api_key) if self.project not in self.fiddler_client.get_project_names(): print( # noqa: T201 f"adding project {self.project}." "This only has to be done once." ) try: self.fiddler_client.add_project(self.project) except Exception as e: print( # noqa: T201 f"Error adding project {self.project}:" "{e}. Fiddler integration will not work." ) raise e dataset_info = self.fdl.DatasetInfo.from_dataframe( self._df, max_inferred_cardinality=0 ) if self.model not in self.fiddler_client.get_dataset_names(self.project): print( # noqa: T201 f"adding dataset {self.model} to project {self.project}." "This only has to be done once." ) try: self.fiddler_client.upload_dataset( project_id=self.project, dataset_id=self.model, dataset={"train": self._df}, info=dataset_info, ) except Exception as e: print( # noqa: T201 f"Error adding dataset {self.model}: {e}." "Fiddler integration will not work." ) raise e model_info = self.fdl.ModelInfo.from_dataset_info( dataset_info=dataset_info, dataset_id="train", model_task=self.fdl.ModelTask.LLM, features=[PROMPT, RESPONSE], metadata_cols=[ RUN_ID, TOTAL_TOKENS, PROMPT_TOKENS, COMPLETION_TOKENS, MODEL_NAME, ], custom_features=self.custom_features, ) if self.model not in self.fiddler_client.get_model_names(self.project): print( # noqa: T201 f"adding model {self.model} to project {self.project}." "This only has to be done once." # noqa: T201 ) try: self.fiddler_client.add_model( project_id=self.project, dataset_id=self.model, model_id=self.model, model_info=model_info, ) except Exception as e: print( # noqa: T201 f"Error adding model {self.model}: {e}." "Fiddler integration will not work." # noqa: T201 ) raise e
@property def custom_features(self) -> list: """ Define custom features for the model to automatically enrich the data with. Here, we enable the following enrichments: - Automatic Embedding generation for prompt and response - Text Statistics such as: - Automated Readability Index - Coleman Liau Index - Dale Chall Readability Score - Difficult Words - Flesch Reading Ease - Flesch Kincaid Grade - Gunning Fog - Linsear Write Formula - PII - Personal Identifiable Information - Sentiment Analysis """ return [ self.fdl.Enrichment( name="Prompt Embedding", enrichment="embedding", columns=[PROMPT], ), self.fdl.TextEmbedding( name="Prompt CF", source_column=PROMPT, column="Prompt Embedding", ), self.fdl.Enrichment( name="Response Embedding", enrichment="embedding", columns=[RESPONSE], ), self.fdl.TextEmbedding( name="Response CF", source_column=RESPONSE, column="Response Embedding", ), self.fdl.Enrichment( name="Text Statistics", enrichment="textstat", columns=[PROMPT, RESPONSE], config={ "statistics": [ "automated_readability_index", "coleman_liau_index", "dale_chall_readability_score", "difficult_words", "flesch_reading_ease", "flesch_kincaid_grade", "gunning_fog", "linsear_write_formula", ] }, ), self.fdl.Enrichment( name="PII", enrichment="pii", columns=[PROMPT, RESPONSE], ), self.fdl.Enrichment( name="Sentiment", enrichment="sentiment", columns=[PROMPT, RESPONSE], ), ]
[docs] def on_llm_start( self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any ) -> Any: run_id = kwargs[RUN_ID] self.run_id_prompts[run_id] = prompts self.run_id_starttime[run_id] = int(time.time())
[docs] def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: flattened_llmresult = response.flatten() token_usage_dict = {} run_id = kwargs[RUN_ID] run_duration = self.run_id_starttime[run_id] - int(time.time()) prompt_responses = [] model_name = "" if isinstance(response.llm_output, dict): if TOKEN_USAGE in response.llm_output: token_usage_dict = response.llm_output[TOKEN_USAGE] if MODEL_NAME in response.llm_output: model_name = response.llm_output[MODEL_NAME] for llmresult in flattened_llmresult: prompt_responses.append(llmresult.generations[0][0].text) df = self.pd.DataFrame( { PROMPT: self.run_id_prompts[run_id], RESPONSE: prompt_responses, } ) if TOTAL_TOKENS in token_usage_dict: df[PROMPT_TOKENS] = int(token_usage_dict[TOTAL_TOKENS]) if PROMPT_TOKENS in token_usage_dict: df[TOTAL_TOKENS] = int(token_usage_dict[PROMPT_TOKENS]) if COMPLETION_TOKENS in token_usage_dict: df[COMPLETION_TOKENS] = token_usage_dict[COMPLETION_TOKENS] df[MODEL_NAME] = model_name df[RUN_ID] = str(run_id) df[DURATION] = run_duration try: self.fiddler_client.publish_events_batch(self.project, self.model, df) except Exception as e: print(f"Error publishing events to fiddler: {e}. continuing...") # noqa: T201