diff --git a/src/components/AIChat/AIChat.tsx b/src/components/AIChat/AIChat.tsx index bb06812a5..0fda90bab 100644 --- a/src/components/AIChat/AIChat.tsx +++ b/src/components/AIChat/AIChat.tsx @@ -229,15 +229,14 @@ export function AIChat(props: AIChatProps) { } } - const reader = response.body?.getReader(); - - if (!reader) { + const stream = response.body; + if (!stream) { setIsStreamingMessage(false); toast.error('Something went wrong'); return; } - await readChatStream(reader, { + await readChatStream(stream, { onMessage: async (content) => { const jsx = await renderMessage(content, aiChatRenderer, { isLoading: true, diff --git a/src/hooks/use-roadmap-ai-chat.tsx b/src/hooks/use-roadmap-ai-chat.tsx index dda428bcd..6f6abbcde 100644 --- a/src/hooks/use-roadmap-ai-chat.tsx +++ b/src/hooks/use-roadmap-ai-chat.tsx @@ -188,14 +188,14 @@ export function useRoadmapAIChat(options: Options) { return; } - const reader = response.body?.getReader(); - if (!reader) { + const stream = response.body; + if (!stream) { setIsStreamingMessage(false); toast.error('Something went wrong'); return; } - await readChatStream(reader, { + await readChatStream(stream, { onMessage: async (content) => { if (abortController?.signal.aborted) { return; diff --git a/src/lib/chat.ts b/src/lib/chat.ts index 4593c8ffc..b1004b865 100644 --- a/src/lib/chat.ts +++ b/src/lib/chat.ts @@ -1,10 +1,25 @@ export const CHAT_RESPONSE_PREFIX = { - message: '0:', - details: 'd:', + message: '0', + details: 'd', } as const; +const NEWLINE = '\n'.charCodeAt(0); + +function concatChunks(chunks: Uint8Array[], totalLength: number) { + const concatenatedChunks = new Uint8Array(totalLength); + + let offset = 0; + for (const chunk of chunks) { + concatenatedChunks.set(chunk, offset); + offset += chunk.length; + } + chunks.length = 0; + + return concatenatedChunks; +} + export async function readChatStream( - reader: ReadableStreamDefaultReader, + stream: ReadableStream, { onMessage, onMessageEnd, @@ -15,39 +30,62 @@ export async function readChatStream( onDetails?: (details: string) => Promise | void; }, ) { + const reader = stream.getReader(); const decoder = new TextDecoder('utf-8'); + const chunks: Uint8Array[] = []; + + let totalLength = 0; let result = ''; while (true) { - const { value, done } = await reader.read(); - if (done) { + const { value } = await reader.read(); + if (value) { + chunks.push(value); + totalLength += value.length; + if (value[value.length - 1] !== NEWLINE) { + // if the last character is not a new line, we need to wait for the next chunk + continue; + } + } + + if (chunks.length === 0) { + // end of stream break; } - const textWithNewLine = decoder.decode(value); - const text = textWithNewLine.replace(/\n$/, ''); + const concatenatedChunks = concatChunks(chunks, totalLength); + totalLength = 0; - if (text.startsWith(CHAT_RESPONSE_PREFIX.message)) { - const textWithoutPrefix = text.replace(CHAT_RESPONSE_PREFIX.message, ''); - - // basically we need to split the text by new line - // and send it to the onMessage callback - // so that we don't have broken tags for our rendering - let start = 0; - for (let i = 0; i < textWithoutPrefix.length; i++) { - if (textWithoutPrefix[i] === '\n') { - result += textWithoutPrefix.slice(start, i + 1); - await onMessage?.(result); - start = i + 1; + const streamParts = decoder + .decode(concatenatedChunks, { stream: true }) + .split('\n') + .filter((line) => line !== '') + .map((line) => { + const separatorIndex = line.indexOf(':'); + if (separatorIndex === -1) { + throw new Error('Invalid line: ' + line + '. No separator found.'); } - } - if (start < textWithoutPrefix.length) { - result += textWithoutPrefix.slice(start); + const prefix = line.slice(0, separatorIndex); + const content = line.slice(separatorIndex + 1); + + switch (prefix) { + case CHAT_RESPONSE_PREFIX.message: + return { type: 'message', content: JSON.parse(content) }; + case CHAT_RESPONSE_PREFIX.details: + return { type: 'details', content }; + default: + throw new Error('Invalid prefix: ' + prefix); + } + }); + + for (const part of streamParts) { + if (part.type === 'message') { + result += part.content; + await onMessage?.(result); + } else if (part.type === 'details') { + await onDetails?.(part.content); } - } else if (text.startsWith(CHAT_RESPONSE_PREFIX.details)) { - const textWithoutPrefix = text.replace(CHAT_RESPONSE_PREFIX.details, ''); - await onDetails?.(textWithoutPrefix); } }