feat: Phase 5 — live streaming trip building via SSE

Grok now drives the trip rendering in real time instead of dumping
the full result after ~90 seconds.

Backend
- GrokHeadlessClient gains a chatStream() async generator that spawns
  grok with --output-format streaming-json (NDJSON of {type,data}
  events), buffers the "text" tokens, and emits partial events as the
  buffer becomes parseable.
- tryPartialJsonParse — lenient JSON repair: walks the buffer once,
  closes structures in stack order, drops in-progress strings and
  dangling keys, returns whatever object is currently consistent.
  Hard-tested with progressive slicing of a multi-stop itinerary.
- New SSE endpoint POST /api/chat/stream with events: open / thinking
  / partial / done / error. Uses res.on('close') + writableEnded as a
  reliable client-disconnect signal (req.on('close') fires in Express
  5 once the body is consumed, which was killing the grok child).

Frontend
- sendMessage swaps to fetch+ReadableStream against /api/chat/stream
  and parses SSE blocks. Each partial event runs a fast synchronous
  normalizePartialItinerary (no Nominatim — drops stops missing
  lat/lng so partial render doesn't block on geocoding).
- The done event runs the full async normalizer for the final pass
  and caches the result per variant.
- Stops, day cards, map markers, polylines, the variant strip, and
  the trip summary all update progressively as Grok writes each stop.

Verified with a London → Edinburgh prompt: 6 partial events landed
across the 76-second stream, with the rail filling in
"Baldock Services" → "+Grantham A1" → "+Premier Inn Newcastle"
→ "+Fort Kinnaird" before the final done event.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-20 16:01:00 +01:00
parent 0a97ea2006
commit ed64712525
3 changed files with 486 additions and 26 deletions
+277
View File
@@ -18,6 +18,107 @@ export interface ChatMessage { role: 'user' | 'assistant' | 'system'; content: s
export interface GrokResponse { text: string; updatedItinerary?: any; variants?: any[]; selectedVariant?: string; }
export type VehicleInput = string | { name: string; rangeKm?: number };
export interface StreamEvent {
type: 'thinking' | 'partial' | 'done' | 'error';
message?: string;
itinerary?: any;
variants?: any[];
text?: string;
selectedVariant?: string;
error?: string;
}
/**
* Lenient JSON parser — given a truncated/in-progress JSON string,
* balance open brackets/quotes and try to parse. Returns null on failure.
*
* Closes nested structures in correct stack order (innermost first).
*/
export function tryPartialJsonParse(input: string): any | null {
const first = input.indexOf('{');
if (first === -1) return null;
const stripped = input.slice(first).replace(/^```json\s*/, '').replace(/```\s*$/, '');
// Walk the buffer to record open structures and look for a complete top-level object
const stack: ('{' | '[')[] = [];
let inStr = false, escape = false;
let cleanEnd = -1;
let lastSafePos = -1; // last position where we're outside strings + at a "safe" punctuation
for (let i = 0; i < stripped.length; i++) {
const c = stripped[i];
if (escape) { escape = false; continue; }
if (inStr) {
if (c === '\\') { escape = true; continue; }
if (c === '"') { inStr = false; lastSafePos = i; }
continue;
}
if (c === '"') { inStr = true; continue; }
if (c === '{' || c === '[') { stack.push(c as '{' | '['); continue; }
if (c === '}' || c === ']') {
stack.pop();
if (stack.length === 0) cleanEnd = i;
lastSafePos = i;
continue;
}
if (c === ',' || c === ':') { lastSafePos = i; }
}
if (cleanEnd !== -1) {
try { return JSON.parse(stripped.slice(0, cleanEnd + 1)); } catch { /* fall through */ }
}
// Build repaired buffer:
// 1) Truncate to lastSafePos (a comma, colon, quote-close, or bracket-close)
// 2) Strip trailing comma OR a dangling key (e.g. `,"foo"` or `:"foo`)
// 3) Close stack in reverse order
let repaired = stripped;
if (inStr) {
// We're mid-string — chop everything from the opening quote
let q = repaired.length - 1;
while (q >= 0 && repaired[q] !== '"') q--;
if (q >= 0) repaired = repaired.slice(0, q);
inStr = false;
} else if (lastSafePos !== -1 && lastSafePos < repaired.length - 1) {
repaired = repaired.slice(0, lastSafePos + 1);
}
// Trim trailing comma, colon, or partial key/value
// e.g. `"name":` → drop the `:` and the preceding `"name"` (it'd be a key with no value)
for (let pass = 0; pass < 4; pass++) {
const before = repaired.length;
// Trailing colon (incomplete key) — drop the key
repaired = repaired.replace(/,?\s*"[^"]*"\s*:\s*$/, '');
// Trailing comma
repaired = repaired.replace(/,\s*$/, '');
// Dangling identifier (true/false/null/number-ish) after colon — drop
repaired = repaired.replace(/,?\s*"[^"]*"\s*:\s*[a-zA-Z0-9.\-+eE]+$/, '');
if (repaired.length === before) break;
}
// Rebuild the open-stack on the REPAIRED buffer (in case the trims changed it)
const finalStack: ('{' | '[')[] = [];
inStr = false; escape = false;
for (let i = 0; i < repaired.length; i++) {
const c = repaired[i];
if (escape) { escape = false; continue; }
if (inStr) {
if (c === '\\') { escape = true; continue; }
if (c === '"') inStr = false;
continue;
}
if (c === '"') { inStr = true; continue; }
if (c === '{' || c === '[') { finalStack.push(c as '{' | '['); continue; }
if (c === '}') { if (finalStack[finalStack.length - 1] === '{') finalStack.pop(); continue; }
if (c === ']') { if (finalStack[finalStack.length - 1] === '[') finalStack.pop(); continue; }
}
// Close in reverse stack order — '{' → '}', '[' → ']'
for (let i = finalStack.length - 1; i >= 0; i--) {
repaired += finalStack[i] === '{' ? '}' : ']';
}
try { return JSON.parse(repaired); } catch { return null; }
}
function vehicleName(v: VehicleInput): string {
return typeof v === 'string' ? v : v.name;
}
@@ -327,6 +428,182 @@ Respond with ONLY the JSON object.`;
}
}
/**
* Streaming chat — yields incremental partial itineraries as Grok produces output.
* Falls back to non-streaming if local CLI is unavailable.
*/
async *chatStream(messages: ChatMessage[], itinerary: any, vehicle: VehicleInput, selectedVariant: string = 'fast'): AsyncGenerator<StreamEvent> {
const requestId = crypto.randomUUID().slice(0, 8);
log.info({ requestId, vehicle: vehicleName(vehicle), selectedVariant }, '=== NEW STREAMING CHAT REQUEST ===');
const activeProvider = await this.getActiveProvider(requestId);
if (activeProvider !== 'local') {
// No real streaming for xAI/fallback yet — just do the regular call and emit a single done event
yield { type: 'thinking', message: 'Asking Grok…' };
const result = activeProvider === 'xai'
? await this.callXaiApi(messages, itinerary, vehicle, requestId, selectedVariant)
: await this.dumbFallback(messages, requestId);
yield {
type: 'done',
text: result.text,
itinerary: result.updatedItinerary,
variants: result.variants,
selectedVariant: result.selectedVariant ?? selectedVariant,
};
return;
}
const prompt = this.buildPrompt(messages, itinerary, vehicle, selectedVariant);
const tmp = await mkdtemp(join(tmpdir(), 'grok-eu-stream-'));
const disallowed = env.nodeEnv === 'development'
? 'search_replace,write_file,Agent,run_terminal_cmd'
: 'run_terminal_cmd,search_replace,write_file,Agent';
const args = [
'-p', prompt,
'--output-format', 'streaming-json',
'--yolo',
'--disallowed-tools', disallowed,
'--tools', 'web_search,web_fetch',
'--max-turns', '6',
'--cwd', tmp,
];
log.info({ requestId }, 'Spawning grok with streaming-json output');
const child = spawn(env.grokBin, args, {
cwd: tmp,
env: { ...process.env, FORCE_COLOR: '0', NO_COLOR: '1' },
stdio: ['ignore', 'pipe', 'pipe'],
});
child.stdout.setEncoding('utf8');
child.stderr.setEncoding('utf8');
type LineEvent = { type: string; data?: string; message?: string };
const lineQueue: LineEvent[] = [];
const errorChunks: string[] = [];
let lineBuffer = '';
let textBuffer = '';
let lastParseLen = 0;
let lastEmittedStops = 0;
let lastEmittedDays = 0;
let closed = false;
let closeCode: number | null = null;
let waker: (() => void) | null = null;
const pushLine = (raw: string) => {
if (!raw) return;
try {
const ev = JSON.parse(raw) as LineEvent;
lineQueue.push(ev);
if (waker) { const w = waker; waker = null; w(); }
} catch (e) {
log.warn({ requestId, raw: raw.slice(0, 200) }, 'Failed to parse grok stream line');
}
};
child.stdout.on('data', (chunk: Buffer) => {
lineBuffer += chunk.toString('utf8');
let nl: number;
while ((nl = lineBuffer.indexOf('\n')) !== -1) {
const line = lineBuffer.slice(0, nl).trim();
lineBuffer = lineBuffer.slice(nl + 1);
if (line) pushLine(line);
}
});
child.stderr.on('data', (chunk: Buffer) => { errorChunks.push(chunk.toString('utf8')); });
child.on('error', (err) => {
log.error({ requestId, err: String(err) }, 'grok child spawn error');
errorChunks.push(`spawn error: ${err}`);
closed = true;
closeCode = -1;
if (waker) { const w = waker; waker = null; w(); }
});
child.on('close', (code, signal) => {
if (lineBuffer.trim()) pushLine(lineBuffer.trim());
log.info({ requestId, code, signal, partialsEmitted: lastEmittedStops > 0 ? `${lastEmittedStops} stops` : 'none', bufferLen: textBuffer.length }, 'grok stream complete');
closed = true;
closeCode = code ?? 0;
if (waker) { const w = waker; waker = null; w(); }
});
const waitForLine = () => new Promise<void>((resolve) => {
if (lineQueue.length > 0 || closed) return resolve();
waker = resolve;
});
try {
yield { type: 'thinking', message: 'Connected to Grok — composing itinerary…' };
while (true) {
if (lineQueue.length === 0) {
if (closed) break;
await waitForLine();
continue;
}
const ev = lineQueue.shift()!;
if (ev.type === 'text' && typeof ev.data === 'string') {
textBuffer += ev.data;
// Parse every ~120 chars to keep CPU sane while still catching new stops fast
if (textBuffer.length - lastParseLen > 120) {
lastParseLen = textBuffer.length;
const partial = tryPartialJsonParse(textBuffer);
if (partial && partial.itinerary && Array.isArray(partial.itinerary.days)) {
const stopCount = partial.itinerary.days.reduce(
(sum: number, d: any) => sum + (Array.isArray(d?.stops)
? d.stops.filter((s: any) => s && typeof s.name === 'string' && typeof s.lat === 'number' && typeof s.lng === 'number').length
: 0),
0,
);
const dayCount = partial.itinerary.days.length;
if (stopCount > lastEmittedStops || dayCount > lastEmittedDays) {
log.debug({ requestId, stopCount, dayCount, bufLen: textBuffer.length }, 'emitting partial');
lastEmittedStops = stopCount;
lastEmittedDays = dayCount;
yield {
type: 'partial',
itinerary: partial.itinerary,
variants: Array.isArray(partial.variants) ? partial.variants : undefined,
message: partial.message,
};
}
}
}
} else if (ev.type === 'thought' && typeof ev.data === 'string') {
// Optional: surface short snippets of Grok's thinking
} else if (ev.type === 'error') {
log.error({ requestId, msg: ev.message }, 'grok streaming error event');
yield { type: 'error', error: ev.message || 'Grok stream error' };
}
}
if (closeCode !== 0) {
log.error({ requestId, closeCode, stderr: errorChunks.join('').slice(-400) }, 'grok stream exited non-zero');
yield { type: 'error', error: `grok exited with code ${closeCode}` };
return;
}
const final = this.parseGrokResponse(textBuffer);
yield {
type: 'done',
text: final.text,
itinerary: final.itinerary,
variants: final.variants,
selectedVariant,
};
} catch (err) {
log.error({ requestId, err: String(err) }, 'grok stream crashed');
yield { type: 'error', error: String(err) };
} finally {
try { if (!child.killed) child.kill(); } catch { /* ignore */ }
await rm(tmp, { recursive: true, force: true }).catch(() => {});
}
}
private async callXaiApi(messages: ChatMessage[], itinerary: any, vehicle: VehicleInput, requestId: string, selectedVariant: string = 'fast'): Promise<GrokResponse> {
const prompt = this.buildPrompt(messages, itinerary, vehicle, selectedVariant);
log.info({ requestId, promptLength: prompt.length, model: 'grok-4.3' }, 'Calling xAI API (grok-4.3 + JSON mode)');