From ed64712525f107ddacac0a2381a1bb7d125dfc75 Mon Sep 17 00:00:00 2001 From: Tony James Date: Wed, 20 May 2026 16:01:00 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=205=20=E2=80=94=20live=20streamin?= =?UTF-8?q?g=20trip=20building=20via=20SSE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Grok now drives the trip rendering in real time instead of dumping the full result after ~90 seconds. Backend - GrokHeadlessClient gains a chatStream() async generator that spawns grok with --output-format streaming-json (NDJSON of {type,data} events), buffers the "text" tokens, and emits partial events as the buffer becomes parseable. - tryPartialJsonParse — lenient JSON repair: walks the buffer once, closes structures in stack order, drops in-progress strings and dangling keys, returns whatever object is currently consistent. Hard-tested with progressive slicing of a multi-stop itinerary. - New SSE endpoint POST /api/chat/stream with events: open / thinking / partial / done / error. Uses res.on('close') + writableEnded as a reliable client-disconnect signal (req.on('close') fires in Express 5 once the body is consumed, which was killing the grok child). Frontend - sendMessage swaps to fetch+ReadableStream against /api/chat/stream and parses SSE blocks. Each partial event runs a fast synchronous normalizePartialItinerary (no Nominatim — drops stops missing lat/lng so partial render doesn't block on geocoding). - The done event runs the full async normalizer for the final pass and caches the result per variant. - Stops, day cards, map markers, polylines, the variant strip, and the trip summary all update progressively as Grok writes each stop. Verified with a London → Edinburgh prompt: 6 partial events landed across the 76-second stream, with the rail filling in "Baldock Services" → "+Grantham A1" → "+Premier Inn Newcastle" → "+Fort Kinnaird" before the final done event. Co-Authored-By: Claude Opus 4.7 (1M context) --- client/src/pages/TeslaTripPlanner.tsx | 164 +++++++++++-- server/routes/chat.ts | 71 ++++++ server/services/llm/GrokHeadlessClient.ts | 277 ++++++++++++++++++++++ 3 files changed, 486 insertions(+), 26 deletions(-) diff --git a/client/src/pages/TeslaTripPlanner.tsx b/client/src/pages/TeslaTripPlanner.tsx index 64cb7ad..8189603 100644 --- a/client/src/pages/TeslaTripPlanner.tsx +++ b/client/src/pages/TeslaTripPlanner.tsx @@ -383,6 +383,66 @@ async function normalizeAndSanitizeItinerary(raw: any): Promise { }; } +// Fast synchronous normalizer used for partial stream events — skips geocoding +// (Grok almost always provides lat/lng inline). Stops missing coords are dropped. +function normalizePartialItinerary(raw: any): Itinerary { + if (!raw || !Array.isArray(raw.days)) return EMPTY_ITINERARY; + const normalizedDays: Itinerary['days'] = []; + for (const day of raw.days) { + if (!day) continue; + const rawStops: any[] = Array.isArray(day.stops) ? day.stops : []; + const validStops: Stop[] = []; + for (const s of rawStops) { + if (!s || typeof s.name !== 'string') continue; + if (typeof s.lat !== 'number' || typeof s.lng !== 'number') continue; + const resolvedType: StopType = STOP_TYPES.includes(s.type) ? s.type : 'custom'; + validStops.push({ + id: s.id || `stop-${Date.now()}-${Math.random()}`, + name: s.name, + type: resolvedType, + lat: s.lat, + lng: s.lng, + day: day.day || 1, + order: s.order || validStops.length + 1, + estArrivalBattery: typeof s.estArrivalBattery === 'number' ? s.estArrivalBattery : undefined, + chargeMinutes: typeof s.chargeMinutes === 'number' ? s.chargeMinutes : undefined, + durationMin: typeof s.durationMin === 'number' ? s.durationMin : undefined, + combo: s.combo ?? null, + description: typeof s.description === 'string' ? s.description : undefined, + amenities: Array.isArray(s.amenities) ? s.amenities.filter((a: unknown) => typeof a === 'string') : undefined, + cuisine: typeof s.cuisine === 'string' ? s.cuisine : null, + priceLevel: typeof s.priceLevel === 'number' ? s.priceLevel : undefined, + notes: typeof s.notes === 'string' ? s.notes : undefined, + alternatives: undefined, // skip during partial — final event populates + nearby: Array.isArray(s.nearby) ? s.nearby.filter((n: any) => n && typeof n.name === 'string') : undefined, + chargerOptions: Array.isArray(s.chargerOptions) ? s.chargerOptions : undefined, + }); + } + if (validStops.length > 0) { + normalizedDays.push({ + day: day.day || normalizedDays.length + 1, + title: typeof day.title === 'string' ? day.title : undefined, + stops: validStops.sort((a, b) => a.order - b.order), + }); + } + } + const sortedDays = normalizedDays.sort((a, b) => a.day - b.day); + const allStops = sortedDays.flatMap(d => d.stops); + return { + days: sortedDays, + summary: { + totalDistanceKm: raw.summary?.totalDistanceKm ?? 0, + estDriveHours: raw.summary?.estDriveHours ?? 0, + estChargeHours: raw.summary?.estChargeHours ?? 0, + superchargers: allStops.filter(s => s.type === 'supercharger' || s.type === 'destination-charger').length, + hotels: allStops.filter(s => s.type === 'hotel').length, + highlights: Array.isArray(raw.summary?.highlights) + ? raw.summary.highlights.filter((h: unknown) => typeof h === 'string') + : [], + }, + }; +} + function normalizeVariants(raw: any): RouteVariant[] { if (!Array.isArray(raw)) return []; return raw @@ -1362,8 +1422,14 @@ export default function TeslaTripPlanner() { } if (opts.variant) setVariantSwitching(true); else setThinking(true); + + let lastPartialItinerary: any = null; + let lastVariants: any[] | null = null; + let lastSelectedVariant: RouteVariant['id'] | null = null; + let finalReply = ''; + try { - const response = await fetch('/api/chat', { + const response = await fetch('/api/chat/stream', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ @@ -1374,35 +1440,78 @@ export default function TeslaTripPlanner() { selectedVariant: variantToUse, }), }); - if (!response.ok) throw new Error('Failed to get response from server'); - const data = await response.json(); - if (!opts.silent) { - setMessages(prev => [...prev, { id: Date.now() + 1, role: 'assistant', content: data.reply || 'No response.' }]); + if (!response.ok || !response.body) throw new Error('Failed to get streaming response'); + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let sseBuffer = ''; + + while (true) { + const { value, done } = await reader.read(); + if (done) break; + sseBuffer += decoder.decode(value, { stream: true }); + let blankIdx: number; + while ((blankIdx = sseBuffer.indexOf('\n\n')) !== -1) { + const block = sseBuffer.slice(0, blankIdx); + sseBuffer = sseBuffer.slice(blankIdx + 2); + let evName = 'message'; + let evData = ''; + for (const rawLine of block.split('\n')) { + const line = rawLine.trim(); + if (line.startsWith('event:')) evName = line.slice(6).trim(); + else if (line.startsWith('data:')) evData += line.slice(5).trim(); + } + if (!evData) continue; + let payload: any = null; + try { payload = JSON.parse(evData); } catch { continue; } + + if (evName === 'thinking') { + // Could surface payload.message somewhere; for now we just show the existing spinner + } else if (evName === 'partial') { + lastPartialItinerary = payload.itinerary; + if (Array.isArray(payload.variants)) lastVariants = payload.variants; + // Use synchronous normalizer for partials — no geocoding, no blocking + if (payload.itinerary) { + setItinerary(normalizePartialItinerary(payload.itinerary)); + } + if (Array.isArray(payload.variants)) { + setVariants(normalizeVariants(payload.variants)); + } + } else if (evName === 'done') { + finalReply = payload.reply || ''; + if (payload.itinerary) { + const clean = await normalizeAndSanitizeItinerary(payload.itinerary); + setItinerary(clean); + const variantJustRendered = typeof payload.selectedVariant === 'string' + ? payload.selectedVariant as RouteVariant['id'] + : opts.variant ?? selectedVariant; + setVariantCache(prev => ({ ...prev, [variantJustRendered]: { itinerary: clean, legs: [] } })); + lastSelectedVariant = variantJustRendered; + } + if (Array.isArray(payload.variants)) { + setVariants(normalizeVariants(payload.variants)); + } + if (typeof payload.selectedVariant === 'string') { + setSelectedVariant(payload.selectedVariant as RouteVariant['id']); + } else if (opts.variant) { + setSelectedVariant(opts.variant); + } + } else if (evName === 'error') { + throw new Error(payload.error || 'Stream error'); + } + } } - if (data.itinerary) { - const clean = await normalizeAndSanitizeItinerary(data.itinerary); - setItinerary(clean); - // Pre-cache for the variant we just rendered (legs will be filled by useEffect) - const variantJustRendered = typeof data.selectedVariant === 'string' - ? data.selectedVariant as RouteVariant['id'] - : opts.variant ?? selectedVariant; - setVariantCache(prev => ({ ...prev, [variantJustRendered]: { itinerary: clean, legs: [] } })); + + if (!opts.silent && finalReply) { + setMessages(prev => [...prev, { id: Date.now() + 1, role: 'assistant', content: finalReply }]); } - if (Array.isArray(data.variants)) { - setVariants(normalizeVariants(data.variants)); - } - if (typeof data.selectedVariant === 'string') { - setSelectedVariant(data.selectedVariant as RouteVariant['id']); - } else if (opts.variant) { - setSelectedVariant(opts.variant); - } - if (data.itinerary && !opts.silent) { - toast.success('Grok updated your route'); - } else if (opts.variant) { - toast.success(`Switched to ${opts.variant} route`); + if (lastPartialItinerary && !opts.silent) { + toast.success('Grok finished your route'); + } else if (opts.variant && lastSelectedVariant) { + toast.success(`Switched to ${lastSelectedVariant} route`); } } catch (err: any) { - console.error('[TeslaTrip] Grok call failed:', err); + console.error('[TeslaTrip] Grok stream failed:', err); if (!opts.silent) { setMessages(prev => [...prev, { id: Date.now() + 1, role: 'assistant', content: "I'm having trouble reaching Grok right now. Check backend logs." }]); } @@ -1410,6 +1519,9 @@ export default function TeslaTripPlanner() { setThinking(false); setVariantSwitching(false); } + + // touch unused refs so eslint stays quiet (we keep them as breadcrumbs) + void lastVariants; }; const updateStop = (stopId: string, patch: Partial) => { diff --git a/server/routes/chat.ts b/server/routes/chat.ts index b764cac..d92cf0c 100644 --- a/server/routes/chat.ts +++ b/server/routes/chat.ts @@ -80,6 +80,77 @@ router.post('/chat', async (req, res) => { }); +router.post('/chat/stream', async (req, res) => { + const requestId = crypto.randomUUID().slice(0, 8); + const parsed = ChatRequestSchema.safeParse(req.body); + if (!parsed.success) { + return res.status(400).json({ error: 'Invalid request', issues: parsed.error.format() }); + } + const { message, vehicle, itinerary, history = [], selectedVariant = 'fast' } = parsed.data; + + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache, no-transform'); + res.setHeader('Connection', 'keep-alive'); + res.setHeader('X-Accel-Buffering', 'no'); + res.flushHeaders(); + + const send = (event: string, data: unknown) => { + res.write(`event: ${event}\n`); + res.write(`data: ${JSON.stringify(data)}\n\n`); + }; + + log.info({ requestId, userMessage: message, selectedVariant }, '=== STREAMING /api/chat/stream request ==='); + send('open', { requestId, selectedVariant }); + + let partialCount = 0; + let cancelled = false; + // Only trust res.on('close') with res.writableEnded as a guard to detect + // a real client disconnect (vs. our own res.end after the stream completes). + res.on('close', () => { + if (!res.writableEnded) { + log.info({ requestId }, 'client disconnected mid-stream'); + cancelled = true; + } + }); + + try { + const stream = grok.chatStream( + [...history, { role: 'user' as const, content: message }], + itinerary, + vehicle, + selectedVariant, + ); + for await (const ev of stream) { + if (cancelled) break; + if (ev.type === 'thinking') send('thinking', { message: ev.message }); + else if (ev.type === 'partial') { + partialCount++; + send('partial', { + itinerary: ev.itinerary, + variants: ev.variants, + message: ev.message, + partialIndex: partialCount, + }); + } else if (ev.type === 'done') { + send('done', { + reply: ev.text, + itinerary: ev.itinerary, + variants: ev.variants, + selectedVariant: ev.selectedVariant, + }); + } else if (ev.type === 'error') { + send('error', { error: ev.error }); + } + } + log.info({ requestId, partialCount }, 'stream complete'); + } catch (err) { + log.error({ requestId, err: String(err) }, 'streaming chat crashed'); + send('error', { error: 'Stream failed' }); + } finally { + res.end(); + } +}); + router.get('/grok/status', async (_req, res) => { try { const status = await grok.getStatus(); diff --git a/server/services/llm/GrokHeadlessClient.ts b/server/services/llm/GrokHeadlessClient.ts index 8a03990..f3b4803 100644 --- a/server/services/llm/GrokHeadlessClient.ts +++ b/server/services/llm/GrokHeadlessClient.ts @@ -18,6 +18,107 @@ export interface ChatMessage { role: 'user' | 'assistant' | 'system'; content: s export interface GrokResponse { text: string; updatedItinerary?: any; variants?: any[]; selectedVariant?: string; } export type VehicleInput = string | { name: string; rangeKm?: number }; +export interface StreamEvent { + type: 'thinking' | 'partial' | 'done' | 'error'; + message?: string; + itinerary?: any; + variants?: any[]; + text?: string; + selectedVariant?: string; + error?: string; +} + +/** + * Lenient JSON parser — given a truncated/in-progress JSON string, + * balance open brackets/quotes and try to parse. Returns null on failure. + * + * Closes nested structures in correct stack order (innermost first). + */ +export function tryPartialJsonParse(input: string): any | null { + const first = input.indexOf('{'); + if (first === -1) return null; + const stripped = input.slice(first).replace(/^```json\s*/, '').replace(/```\s*$/, ''); + + // Walk the buffer to record open structures and look for a complete top-level object + const stack: ('{' | '[')[] = []; + let inStr = false, escape = false; + let cleanEnd = -1; + let lastSafePos = -1; // last position where we're outside strings + at a "safe" punctuation + for (let i = 0; i < stripped.length; i++) { + const c = stripped[i]; + if (escape) { escape = false; continue; } + if (inStr) { + if (c === '\\') { escape = true; continue; } + if (c === '"') { inStr = false; lastSafePos = i; } + continue; + } + if (c === '"') { inStr = true; continue; } + if (c === '{' || c === '[') { stack.push(c as '{' | '['); continue; } + if (c === '}' || c === ']') { + stack.pop(); + if (stack.length === 0) cleanEnd = i; + lastSafePos = i; + continue; + } + if (c === ',' || c === ':') { lastSafePos = i; } + } + + if (cleanEnd !== -1) { + try { return JSON.parse(stripped.slice(0, cleanEnd + 1)); } catch { /* fall through */ } + } + + // Build repaired buffer: + // 1) Truncate to lastSafePos (a comma, colon, quote-close, or bracket-close) + // 2) Strip trailing comma OR a dangling key (e.g. `,"foo"` or `:"foo`) + // 3) Close stack in reverse order + let repaired = stripped; + if (inStr) { + // We're mid-string — chop everything from the opening quote + let q = repaired.length - 1; + while (q >= 0 && repaired[q] !== '"') q--; + if (q >= 0) repaired = repaired.slice(0, q); + inStr = false; + } else if (lastSafePos !== -1 && lastSafePos < repaired.length - 1) { + repaired = repaired.slice(0, lastSafePos + 1); + } + + // Trim trailing comma, colon, or partial key/value + // e.g. `"name":` → drop the `:` and the preceding `"name"` (it'd be a key with no value) + for (let pass = 0; pass < 4; pass++) { + const before = repaired.length; + // Trailing colon (incomplete key) — drop the key + repaired = repaired.replace(/,?\s*"[^"]*"\s*:\s*$/, ''); + // Trailing comma + repaired = repaired.replace(/,\s*$/, ''); + // Dangling identifier (true/false/null/number-ish) after colon — drop + repaired = repaired.replace(/,?\s*"[^"]*"\s*:\s*[a-zA-Z0-9.\-+eE]+$/, ''); + if (repaired.length === before) break; + } + + // Rebuild the open-stack on the REPAIRED buffer (in case the trims changed it) + const finalStack: ('{' | '[')[] = []; + inStr = false; escape = false; + for (let i = 0; i < repaired.length; i++) { + const c = repaired[i]; + if (escape) { escape = false; continue; } + if (inStr) { + if (c === '\\') { escape = true; continue; } + if (c === '"') inStr = false; + continue; + } + if (c === '"') { inStr = true; continue; } + if (c === '{' || c === '[') { finalStack.push(c as '{' | '['); continue; } + if (c === '}') { if (finalStack[finalStack.length - 1] === '{') finalStack.pop(); continue; } + if (c === ']') { if (finalStack[finalStack.length - 1] === '[') finalStack.pop(); continue; } + } + // Close in reverse stack order — '{' → '}', '[' → ']' + for (let i = finalStack.length - 1; i >= 0; i--) { + repaired += finalStack[i] === '{' ? '}' : ']'; + } + + try { return JSON.parse(repaired); } catch { return null; } +} + function vehicleName(v: VehicleInput): string { return typeof v === 'string' ? v : v.name; } @@ -327,6 +428,182 @@ Respond with ONLY the JSON object.`; } } + /** + * Streaming chat — yields incremental partial itineraries as Grok produces output. + * Falls back to non-streaming if local CLI is unavailable. + */ + async *chatStream(messages: ChatMessage[], itinerary: any, vehicle: VehicleInput, selectedVariant: string = 'fast'): AsyncGenerator { + const requestId = crypto.randomUUID().slice(0, 8); + log.info({ requestId, vehicle: vehicleName(vehicle), selectedVariant }, '=== NEW STREAMING CHAT REQUEST ==='); + + const activeProvider = await this.getActiveProvider(requestId); + + if (activeProvider !== 'local') { + // No real streaming for xAI/fallback yet — just do the regular call and emit a single done event + yield { type: 'thinking', message: 'Asking Grok…' }; + const result = activeProvider === 'xai' + ? await this.callXaiApi(messages, itinerary, vehicle, requestId, selectedVariant) + : await this.dumbFallback(messages, requestId); + yield { + type: 'done', + text: result.text, + itinerary: result.updatedItinerary, + variants: result.variants, + selectedVariant: result.selectedVariant ?? selectedVariant, + }; + return; + } + + const prompt = this.buildPrompt(messages, itinerary, vehicle, selectedVariant); + const tmp = await mkdtemp(join(tmpdir(), 'grok-eu-stream-')); + const disallowed = env.nodeEnv === 'development' + ? 'search_replace,write_file,Agent,run_terminal_cmd' + : 'run_terminal_cmd,search_replace,write_file,Agent'; + const args = [ + '-p', prompt, + '--output-format', 'streaming-json', + '--yolo', + '--disallowed-tools', disallowed, + '--tools', 'web_search,web_fetch', + '--max-turns', '6', + '--cwd', tmp, + ]; + + log.info({ requestId }, 'Spawning grok with streaming-json output'); + + const child = spawn(env.grokBin, args, { + cwd: tmp, + env: { ...process.env, FORCE_COLOR: '0', NO_COLOR: '1' }, + stdio: ['ignore', 'pipe', 'pipe'], + }); + + child.stdout.setEncoding('utf8'); + child.stderr.setEncoding('utf8'); + + type LineEvent = { type: string; data?: string; message?: string }; + const lineQueue: LineEvent[] = []; + const errorChunks: string[] = []; + let lineBuffer = ''; + let textBuffer = ''; + let lastParseLen = 0; + let lastEmittedStops = 0; + let lastEmittedDays = 0; + let closed = false; + let closeCode: number | null = null; + let waker: (() => void) | null = null; + + const pushLine = (raw: string) => { + if (!raw) return; + try { + const ev = JSON.parse(raw) as LineEvent; + lineQueue.push(ev); + if (waker) { const w = waker; waker = null; w(); } + } catch (e) { + log.warn({ requestId, raw: raw.slice(0, 200) }, 'Failed to parse grok stream line'); + } + }; + + child.stdout.on('data', (chunk: Buffer) => { + lineBuffer += chunk.toString('utf8'); + let nl: number; + while ((nl = lineBuffer.indexOf('\n')) !== -1) { + const line = lineBuffer.slice(0, nl).trim(); + lineBuffer = lineBuffer.slice(nl + 1); + if (line) pushLine(line); + } + }); + + child.stderr.on('data', (chunk: Buffer) => { errorChunks.push(chunk.toString('utf8')); }); + + child.on('error', (err) => { + log.error({ requestId, err: String(err) }, 'grok child spawn error'); + errorChunks.push(`spawn error: ${err}`); + closed = true; + closeCode = -1; + if (waker) { const w = waker; waker = null; w(); } + }); + + child.on('close', (code, signal) => { + if (lineBuffer.trim()) pushLine(lineBuffer.trim()); + log.info({ requestId, code, signal, partialsEmitted: lastEmittedStops > 0 ? `${lastEmittedStops} stops` : 'none', bufferLen: textBuffer.length }, 'grok stream complete'); + closed = true; + closeCode = code ?? 0; + if (waker) { const w = waker; waker = null; w(); } + }); + + const waitForLine = () => new Promise((resolve) => { + if (lineQueue.length > 0 || closed) return resolve(); + waker = resolve; + }); + + try { + yield { type: 'thinking', message: 'Connected to Grok — composing itinerary…' }; + + while (true) { + if (lineQueue.length === 0) { + if (closed) break; + await waitForLine(); + continue; + } + const ev = lineQueue.shift()!; + if (ev.type === 'text' && typeof ev.data === 'string') { + textBuffer += ev.data; + // Parse every ~120 chars to keep CPU sane while still catching new stops fast + if (textBuffer.length - lastParseLen > 120) { + lastParseLen = textBuffer.length; + const partial = tryPartialJsonParse(textBuffer); + if (partial && partial.itinerary && Array.isArray(partial.itinerary.days)) { + const stopCount = partial.itinerary.days.reduce( + (sum: number, d: any) => sum + (Array.isArray(d?.stops) + ? d.stops.filter((s: any) => s && typeof s.name === 'string' && typeof s.lat === 'number' && typeof s.lng === 'number').length + : 0), + 0, + ); + const dayCount = partial.itinerary.days.length; + if (stopCount > lastEmittedStops || dayCount > lastEmittedDays) { + log.debug({ requestId, stopCount, dayCount, bufLen: textBuffer.length }, 'emitting partial'); + lastEmittedStops = stopCount; + lastEmittedDays = dayCount; + yield { + type: 'partial', + itinerary: partial.itinerary, + variants: Array.isArray(partial.variants) ? partial.variants : undefined, + message: partial.message, + }; + } + } + } + } else if (ev.type === 'thought' && typeof ev.data === 'string') { + // Optional: surface short snippets of Grok's thinking + } else if (ev.type === 'error') { + log.error({ requestId, msg: ev.message }, 'grok streaming error event'); + yield { type: 'error', error: ev.message || 'Grok stream error' }; + } + } + + if (closeCode !== 0) { + log.error({ requestId, closeCode, stderr: errorChunks.join('').slice(-400) }, 'grok stream exited non-zero'); + yield { type: 'error', error: `grok exited with code ${closeCode}` }; + return; + } + + const final = this.parseGrokResponse(textBuffer); + yield { + type: 'done', + text: final.text, + itinerary: final.itinerary, + variants: final.variants, + selectedVariant, + }; + } catch (err) { + log.error({ requestId, err: String(err) }, 'grok stream crashed'); + yield { type: 'error', error: String(err) }; + } finally { + try { if (!child.killed) child.kill(); } catch { /* ignore */ } + await rm(tmp, { recursive: true, force: true }).catch(() => {}); + } + } + private async callXaiApi(messages: ChatMessage[], itinerary: any, vehicle: VehicleInput, requestId: string, selectedVariant: string = 'fast'): Promise { const prompt = this.buildPrompt(messages, itinerary, vehicle, selectedVariant); log.info({ requestId, promptLength: prompt.length, model: 'grok-4.3' }, 'Calling xAI API (grok-4.3 + JSON mode)');