Walkthrough¶
Follow one chat message — “I need something bold” — from the moment a user types it until the grounded reply renders in the browser.
The phrase isn’t an obvious coffee question. The router has to recognize that “bold” is an idiom for a dark roast or strong espresso, then use the vector-search route against the menu — that’s the whole point of the demo.
The path of this message:
The browser opens an SSE stream to
/api/chat/stream.Flash-Lite classifies it as
PRODUCT_RAG.Vertex AI embeds the question (with an Oracle-backed cache in front).
Oracle 26ai’s HNSW index returns the closest products.
Gemini may select among candidate product ids, and the runner renders one grounded final SSE event from the selected Oracle rows.
flowchart LR
B([Browser]) -->|POST /api/chat/stream| C[Litestar controller]
C --> R[ADKRunner]
R --> I[Flash-Lite<br/>intent]
I --> V[Vertex AI<br/>embedding]
V --> O[(Oracle 26ai<br/>HNSW search)]
O --> S[Structured selector]
S --> F[Grounded renderer]
F -->|SSE final| B
The whole product-RAG turn in one picture: SSE in, SSE out, with routing, retrieval, and answer formatting inside the runner.
1. The browser opens an SSE stream¶
The chat form posts to /api/chat/stream. The controller validates the
message, derives the ADK session identity from the Litestar session, and
hands control to ADKRunner.stream_request(). Each event the runner yields
becomes one SSE block on the wire.
flowchart TD
F[Chat form] -->|fetch POST| H[stream_chat_message]
H --> I[validate message]
H --> S[adk_session_identity]
H --> R[ADKRunner.stream_request]
R -. delta / final / error .-> H
H -->|ServerSentEvent| F
The chat routes live on a Litestar controller. stream_chat_message is the
SSE endpoint the form posts to — it validates input, bridges the Litestar
session to an ADK session, then delegates the streaming work to ADKRunner.
@post(path="/api/chat/stream", name="chat.api.stream")
async def stream_chat_message(
self, adk_runner: Inject[ADKRunner], tools_service: Inject[AgentToolsService], request: HTMXRequest
) -> ServerSentEvent:
"""Stream chat response events for the browser chat UI.
Returns:
Server-sent event stream with delta, final, or error events.
Raises:
ValidationException: If the user message fails validation (HTTP 400).
AIServiceUnconfigured: If Vertex AI credentials are missing (HTTP 503).
"""
data = await chat_form_from_request(request)
clean_message = self.validate_message(data.message)
validated_persona = self.validate_persona(data.persona)
location_context = location_context_from_form(data)
user_id, session_id = adk_session_identity(request)
adk_runner.ensure_configured()
async def stream_events() -> AsyncIterator[dict[str, str]]:
try:
async for event in adk_runner.stream_request(
query=clean_message,
user_id=user_id,
session_id=session_id,
persona=validated_persona,
tools_service=tools_service,
location_context=location_context,
):
event_type = str(event.get("type", "message"))
yield {"event": event_type, "data": to_json(event, as_bytes=False)}
except Exception as exc: # noqa: BLE001
await logger.aexception(
"Chat stream failed after response started", error_type=type(exc).__name__, detail=str(exc)
)
yield {
"event": "error",
"data": to_json({"type": "error", "message": _STREAM_ERROR_MESSAGE}, as_bytes=False),
}
return ServerSentEvent(stream_events(), status_code=200)
Tour stop
The chat controller is intentionally thin. Validation, session bridging, and
error sanitization happen here; everything else is delegated to ADKRunner
and the Dishka-injected AgentToolsService.
2. Vertex AI embeds the question¶
When the runner classifies the turn as PRODUCT_RAG, the question is sent
to Vertex AI’s gemini-embedding-2 model with a query-purpose instruction
prepended to the text. The general-conversation fallback can call the same
vector-search tool, but this menu turn uses the deterministic route. Document
embeddings (the products themselves) are produced separately with a
document-purpose instruction so query and document vectors share the same
metric geometry. Repeat queries are short-circuited by the Oracle-backed
embedding cache.
flowchart TD
Q["query: I need something bold"] --> CK{embedding_cache hit?}
CK -- yes --> EV["list[float] · 3072 dims"]
CK -- no --> V[Vertex AI embed_content]
V --> EV
EV --> NX[next stop]
VertexAIService.get_text_embedding is the wrapper around the Vertex AI
gemini-embedding-2 call, with the Oracle-backed embedding cache check
in front of it. Gemini Embedding 2 does not use the old embedding task_type
API parameter; this app encodes query-vs-document intent in the text sent to the
embedding model.
async def get_text_embedding(
self, text: str, *, embedding_purpose: str = "document", return_cache_status: bool = False
) -> Any:
cached = await self.cache_service.get_embedding(text, self.embedding_model)
if cached:
return (cached, True) if return_cache_status else cached
content = _embedding_content(self.embedding_model, text, embedding_purpose)
config_kwargs: dict[str, Any] = {"output_dimensionality": self.embedding_dimensions}
response = await self.client.aio.models.embed_content(
model=self.embedding_model, contents=content, config=EmbedContentConfig(**config_kwargs)
)
embedding_list = response.embeddings
if not embedding_list or not embedding_list[0].values:
return None
embedding = embedding_list[0].values
await self.cache_service.save_embedding(text, embedding, self.embedding_model)
return (embedding, False) if return_cache_status else embedding
ADK 2.0 detail
Query/document purpose is not cosmetic. The user question and product fixture text are embedded with different instructions so Oracle compares vectors generated for compatible retrieval roles.
3. Oracle 26ai finds matching products¶
The 3072-dimension query vector is bound straight into a named SQL query.
Oracle’s HNSW index over product.embedding returns the top matches in a
single round trip; 1 - VECTOR_DISTANCE(..., COSINE) is reshaped into a
similarity score so a higher number means “more like the query.”
flowchart LR
QV["list[float] · 3072"] --> P[ProductService.search_by_vector]
P --> S[(named SQL<br/>vector-search-products)]
S --> X[(Oracle HNSW index<br/>NEIGHBOR GRAPH)]
X --> M["list[ProductMatch]<br/>id · name · price · score"]
Every query lives as a named SQL file under src/app/db/sql/; SQLSpec loads
them by key. Here is the vector search the product service runs:
-- name: vector-search-products
SELECT id,
name,
description,
price,
1 - VECTOR_DISTANCE(embedding, :query_vector, COSINE) AS similarity_score
FROM product
WHERE 1 - VECTOR_DISTANCE(embedding, :query_vector, COSINE) > :threshold
ORDER BY similarity_score DESC
FETCH FIRST :limit ROWS ONLY;
The matching ProductService method calls that named SQL and lets SQLSpec
map each row straight into a ProductMatch msgspec struct.
async def search_by_vector(
self,
query_embedding: list[float],
similarity_threshold: float = 0.7,
limit: int = 5,
*,
store_id: int | None = None,
) -> list[ProductMatch]:
sql_key = "vector-search-products-by-store" if store_id is not None else "vector-search-products"
binds: dict[str, Any] = {"query_vector": query_embedding, "threshold": similarity_threshold, "limit": limit}
if store_id is not None:
binds["store_id"] = store_id
return await self.driver.select(db_manager.get_sql(sql_key), **binds, schema_type=ProductMatch)
Oracle 26ai internals
See Oracle 26ai vector search for the HNSW
index shape and the vector_memory_size knob.
4. The runner emits a grounded final event¶
For PRODUCT_RAG, the runner does not stream speculative model deltas. It
may use Gemini structured output to select among returned product ids, validates
that selection, then renders one grounded final event from Oracle product
rows. Store location and product availability turns follow the same
deterministic shape: classify first, query named SQL through request-scoped
services, then emit a single grounded event with optional map actions.
flowchart TD
I{intent} -->|PRODUCT_RAG| P[Product RAG]
I -->|STORE_LOCATION| S[Store lookup]
I -->|PRODUCT_AVAILABILITY| A[Inventory lookup]
P --> O[(Oracle 26ai)]
S --> O
A --> O
O --> R[Grounded final event]
For GENERAL_CONVERSATION, the runner falls through to the Google ADK 2.0
workflow. That path uses an LlmAgent with the same closure-bound tools and a
parallel FunctionNode classifier before the workflow output is packaged.
return Workflow(
name="coffee_workflow",
edges=[("START", intent, join), ("START", coffee, join), (join, merge)],
max_concurrency=2,
)
ADK 2.0 detail
The ADK workflow is still useful for general conversation and model-driven fallbacks, but grounded product, store, availability, and unsupported order routes are handled before the workflow is built. That is why menu turns never show an ungrounded draft that later gets overwritten.
What’s next¶
Short pages on vectors in Oracle, RAG, Google ADK, and store/map grounding.
Quickstart, the coffee CLI, autodoc on ADKRunner and the core services,
and a “for the curious” appendix.