In many applications involving large language models (LLMs), responses can be long or involve multiple stages of processing. Streaming these responses in real-time can provide better user experience and performance.
Azure Functions introduced the preview of HTTP streams in May 2024: Azure Functions: Support for HTTP Streams in Python is now in Preview!
LangChain supports streaming the answer from LLMs by using ‘astream’ method: Streaming With LangChain
Here is code example how to fetch the LLM Chat answer stream with LangChain and forward it to Azure HTTP trigger Function.
Real application would have more complex LLM chain, with RAG or Agents, but streaming part of the code would stay the same.
import os
import logging
import json
import azure.functions as func
from azurefunctions.extensions.http.fastapi import Request, StreamingResponse
from langchain_openai import AzureChatOpenAI
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
# Function that processes the streamed response from the LLM
async def stream_processor(response):
async for chunk in response:
# Construct JSON with metadata and content
partial_data = json.dumps({
# Placeholder for deployment info
"deployment": "Deployment Name",
# Actual streamed content
"delta_content": chunk.content
})
# Yield data in Server-Sent Events format
yield f'data: {partial_data}\n\n'
@app.route(route="http_trigger", methods=[func.HttpMethod.POST])
async def http_trigger(req: Request) -> StreamingResponse:
logging.info('Python HTTP trigger function processed a request.')
try:
req_body = await req.json()
usermessage = req_body.get('user')
except ValueError:
return func.HttpResponse(
"Invalid input",
status_code=400
)
llm = AzureChatOpenAI()
response = llm.astream(usermessage) #error handling missing
return StreamingResponse(stream_processor(response), media_type="text/event-stream")
This function receives the user question in JSON body of the request, and forwards this question to LLM.
{"user" : "Who are you?"}
‘chunk.content’ is plain text answer from (in this example) OpenAI ChatGPT. Azure function can return this plain text chunks directly, but in most cases it is more useful to return some additional metadata with it with JSON.
OpenAI API does the same, here is the example curl to verify:
curl -X POST "ChatGPT deployment URL" \
-H 'api-key: your-api-key' \
-H 'Content-Type: application/json' \
-d '{"stream": true,"model": "2023-09-01-preview", "messages": [ {"role": "user", "content": "Who are you?"}]}'
Additionlly server-sent events (SSE) define format how data should be sent (by adding ‘data: ‘ string in front of every data chunk and ending it with \n\n).
Therefore, this example creates sample json with one placeholder attribute (‘deployment’) and ‘delta_content’ that contains the pieces of LLM answer, formats it properly to comply with SSE and streams it back.
This all needs to be stitched up together on the client side: https://axld.substack.com/p/how-to-receive-the-streaming-llm
good read, i also wrote an article on how we have increased our entity extraction service speed by 2-3X using async programming & open ai async api calling.
https://ashwaths.substack.com/p/accelerating-data-processing-how