Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for audit trails and metrics #628

Open
willsmanley opened this issue Aug 15, 2024 · 12 comments
Open

Support for audit trails and metrics #628

willsmanley opened this issue Aug 15, 2024 · 12 comments

Comments

@willsmanley
Copy link

willsmanley commented Aug 15, 2024

More of a question than an issue, but I wanted to see if you have a recommendation for creating audit trails or plans to support it more directly from a library interface.

I would like to have tools to record:

I am going to work on stitching all of this together and see how far I get, but I wanted to start a discussion here.

So far, unless you are using your own LLM plugin or forking an existing LLM plugin, it seems like you need to hook into the completion request within will_synthesize_assistant_reply. Capturing the response without forking seems trickier. And mp3, not sure about that yet.

@willsmanley
Copy link
Author

@davidzhao fyi, thank you in advance

@willsmanley
Copy link
Author

i'm making some limited progress here:

  1. i realized that mp3 recordings ought to be managed by livekit/egress, duh. specifically as an autoegress upon room creation request. strangely, i could only get this to work with a raw twirp request rather than with an SDK, but that's ok. So far, I am autoegressing the user's track, but it's not publishing the agent's track. I will update here if I figure out how to do that.
  2. i think LLM token usage is solved at a basic level from the PR linked above, but it could possibly be more of a first-class citizen of the API than this changeset suggests. That was just the minimal change required.
  3. seems hacky to log completion requests via will_synthesize_assistant_reply and still haven't made a good solution for logging completion responses.

in general, have library authors looked at pipecat? they made some clearly different choices, but just wanted to bring this up in case there is anything to be learned from their implementation. i made a mutable.ai wiki for both repos since both are early in their development and documentation:
https://wiki.mutable.ai/pipecat-ai/pipecat
https://wiki.mutable.ai/livekit/agents

@willsmanley
Copy link
Author

update: i figured out how to also egress the room composite in addition to the user's track, but still having issues with the agent-only track. created a separate issue here: #656

@willsmanley
Copy link
Author

created this PR for logging completion requests: #658

@keepingitneil
Copy link
Contributor

Yeah egress is a good way to record audio. For LLM completions you can use the VoiceAssistant user_speech_committed and agent_speech_committed events

https://github.com/livekit/agents/blob/main/livekit-agents/livekit/agents/voice_assistant/voice_assistant.py#L25

@davidzhao
Copy link
Member

I think in general this is a great idea. we'd want to capture metrics on usage and report them.

@willsmanley
Copy link
Author

willsmanley commented Aug 24, 2024

The problem with user_speech_committed and agent_speech_committed is that they only emit the most recent message (and not any RAG output from will_synthesize_agent_response or tool_calls either). They could be extended to emit other context as well. I made this PR which emits everything I would be interested in (and I assume others who are doing LLMops / monitoring): #658

And usage tokens PR here: #614

I'm happy to change either one if you want to let me know what you'd like to see differently.

On my fork with these changes (along with the egress part), I have pretty much e2e monitoring for how fast tokens are being spent, who is spending them, and what all of the llm requests/responses are. It even works if you need to log ChatImages, but requires you to serialize these into JSON.

@davidzhao davidzhao changed the title Support for audit trails? Support for audit trails and metrics? Oct 16, 2024
@davidzhao davidzhao changed the title Support for audit trails and metrics? Support for audit trails and metrics Oct 16, 2024
@davidzhao
Copy link
Member

thanks for your feedback (and patience) Wills! we'll create a proposal around this that satisfies what you are looking for. ideally timing information is included there as well.

@firattamurlc
Copy link

@willsmanley
Copy link
Author

I made some PRs which are linked above. They probably need to be rebased since there have been some API changes, but you can get the gist from that. And I updated the egress solution on the other thread since it isn't something that can be solved with a PR in this library.

@darinkishore
Copy link

darinkishore commented Oct 21, 2024

thanks for your feedback (and patience) Wills! we'll create a proposal around this that satisfies what you are looking for. ideally timing information is included there as well.

Hi David!

Have started playing around with the framework, have a realtime agent hooked up to a phone number I've been using for the last couple of weeks.

Just wanted to make sure that the Realtime agent as well as Pipeline will be covered (for the audio logging/processing/generally extracting the data). Have really been struggling with logging and tracing with that so far.

Do you have any ideas on how to go about doing that?

Right now, I'm trying to do so like so:

class AgentContext:
    """
    Context for event handlers, including user, conversation, and synchronization.
    """

    def __init__(self, user: User, conversation: Conversation):
        self.user = user
        self.conversation = conversation
        self.last_message_index = 0
        self.lock = asyncio.Lock()


async def run_multimodal_agent(
    ctx: JobContext, participant: rtc.Participant, db: AsyncSession
):
    """
    Sets up and runs the MultimodalAgent, registering event handlers for logging.
    """
    logger.info('Starting MultimodalAgent')
    room = ctx.room

    # Setup Realtime Model with OpenAI
    openai_realtime_model = realtime.RealtimeModel(
        instructions=(
            "\n\nCurrent time: {DateTime.now().format('h:mm A')}, {DateTime.now().format('MMMM D')}."
        ),
        temperature=0.8,
    )

    fnc_ctx = AssistantFnc()
    transcription_options = AgentTranscriptionOptions()
    agent = MultimodalAgent(
        model=openai_realtime_model,
        fnc_ctx=fnc_ctx,
        transcription=transcription_options,
    )

    user = await get_or_create_user(
        db, username='darin', email='[email protected]'
    )

    conversation = await create_conversation(db, user.user_id)

    agent_context = AgentContext(user, conversation)

    # Start agent
    agent.start(ctx.room, participant)
    logger.info('MultimodalAgent started and running.')

    # Access session via room's session
    session = openai_realtime_model.sessions[0]

    # Register event handlers
    session.on(
        'input_speech_transcription_completed',
        lambda: asyncio.create_task(handle_new_messages(session, agent_context)),
    )

    agent.on(
        'agent_stopped_speaking',
        lambda: asyncio.create_task(handle_new_messages(session, agent_context)),
    )
    
async def handle_new_messages(session, context: AgentContext):
    """
    Processes and logs new messages from ChatContext.
    """
    async with context.lock:
        async with get_db() as db:
            messages = session.chat_ctx.messages
            new_messages = messages[context.last_message_index :]

            for msg in new_messages:
                await process_message(msg, context, db)

            context.last_message_index = len(messages)


async def process_message(
    msg: llm.ChatMessage, context: AgentContext, db: AsyncSession
):
    """
    Logs individual messages and function calls to the database.
    """
    timestamp = DateTime.utcnow()
    sender = msg.role
    message_type = 'text'

    # Extract text
    if isinstance(msg.content, str):
        message_text = msg.content
    elif isinstance(msg.content, list):
        message_text = ''.join(part for part in msg.content if isinstance(part, str))
    else:
        message_text = ''

    if msg.tool_calls:
    # ...

But I'm unsure what the issue is right now. Is the getting things from the ChatCtx the way to go? What events should I listen for? I also really want to process the audio for realtime myself as well, because the current OpenAI transcripts are inaccurate a lot, but we can't configure the TTS provider, and I have no idea how to get the transcript of the convo at all really (async code).

Copy link
Member

RealtimeAPI is not yet emitting metrics. It's on our list and will be released shortly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants