Computer >> कंप्यूटर ट्यूटोरियल >  >> प्रोग्रामिंग >> Redis

ऐसी लचीली एलएलएम स्ट्रीम बनाएं जो डिस्कनेक्ट, रिफ्रेश और क्रैश के बावजूद बनी रहे

हम क्या बना रहे हैं

<पी> इस लेख में, हम अत्यंत टिकाऊ एलएलएम स्ट्रीम का निर्माण कर रहे हैं जो आसानी से जीवित रहती हैं:

  • नेटवर्क आउटेज
  • पेज रिफ्रेश
  • वेबसाइट बंद करना
  • लैपटॉप का ढक्कन बंद करना
<पी> बोनस:आप एक ही स्ट्रीम को एक ही समय में कई डिवाइस (जैसे फ़ोन और लैपटॉप) पर देख सकते हैं .

<पी> इससे कोई फर्क नहीं पड़ता कि आप धारा को तोड़ने की कितनी कोशिश करते हैं, आपके डिस्कनेक्ट होने पर भी यह पृष्ठभूमि में जारी रहती है और जब आप वापस आते हैं तो यह सुचारू रूप से जारी रहती है। यह एक अविश्वसनीय है उपयोगकर्ता अनुभव.

<पी> टिकाऊ एलएलएम स्ट्रीम डेमो 👇

प्रेरणा

<पी> AI के साथ निर्माण करते समय, वास्तविक समय में AI प्रतिक्रियाओं को स्ट्रीम करना सबसे अच्छा अभ्यास है।

<पी> संपूर्ण प्रतिक्रिया की प्रतीक्षा करने के बजाय, आपका उपयोगकर्ता वास्तविक समय में सामग्री को उत्पन्न होते ही देखता है - जो यूएक्स के लिए आश्चर्यजनक है। वर्सेल द्वारा एआई एसडीके जैसे टूल ने इसे बेहद आसान बना दिया है:

import { openai } from "@ai-sdk/openai";
import { streamText } from "ai";
 
const { textStream } = streamText({
 model: openai("gpt-4o"),
 prompt: "Invent a new holiday and describe its traditions.",
});
<पी> वास्तविक समय एलएलएम स्ट्रीम को तकनीकी स्तर पर काम करने के लिए, आप एक क्लाइंट को एपीआई से कनेक्ट करते हैं और सर्वर सेंट इवेंट्स (एसएसई) जैसे प्रोटोकॉल का उपयोग करके डेटा स्ट्रीम करते हैं:

<पी> ऐसी लचीली एलएलएम स्ट्रीम बनाएं जो डिस्कनेक्ट, रिफ्रेश और क्रैश के बावजूद बनी रहे

<पी> लेकिन:इस सेटअप में एक समस्या है। <पी> यदि स्ट्रीम के दौरान कुछ भी होता है, जैसे आपका इंटरनेट डिस्कनेक्ट होना, लैपटॉप का ढक्कन बंद होना, या नेटवर्क संबंधी दिक्कत, तो पूरी पीढ़ी नष्ट हो जाती है। आपको फिर से शुरुआत करने और पूरी पीढ़ी तक इंतजार करने की जरूरत है। यह विशेष रूप से लंबी पीढ़ियों के लिए कष्टप्रद है (उदाहरण के लिए O1 जैसे महंगे-से-चलने वाले मॉडल के साथ)।

<पी> ऐसी लचीली एलएलएम स्ट्रीम बनाएं जो डिस्कनेक्ट, रिफ्रेश और क्रैश के बावजूद बनी रहे

<पी> जाहिर है यह समस्या लोगों के रडार पर है। विश्वसनीय वास्तविक समय एलएलएम स्ट्रीमिंग की वास्तविक मांग है, और अधिक डेवलपर इसे काम करने के तरीकों के साथ प्रयोग कर रहे हैं:

<पी> ऐसी लचीली एलएलएम स्ट्रीम बनाएं जो डिस्कनेक्ट, रिफ्रेश और क्रैश के बावजूद बनी रहे

हाई-ड्यूरेबिलिटी एलएलएम स्ट्रीम का निर्माण

<पी> वास्तव में टिकाऊ, पुन:प्रारंभ करने योग्य एलएलएम स्ट्रीम बनाने का रहस्य ग्राहक को पीढ़ी के माहौल से अलग करना है। क्लाइंट कनेक्शन अस्थिर होते हैं और कई कारणों से डिस्कनेक्ट हो सकते हैं, जैसे लैपटॉप बंद करना, नेटवर्क समस्याएँ या पेज रीफ़्रेश करना।

<पी> क्लाइंट और जेनरेशन प्रक्रियाओं को अलग-अलग रखने से जेनरेशन हमेशा निर्बाध रूप से जारी रहती है। ग्राहक किसी भी समय चालू पीढ़ी को बाधित किए बिना पुनः कनेक्ट कर सकते हैं।

<पी> बुरा विचार:लगातार, सीधा संबंध: <पी> ऐसी लचीली एलएलएम स्ट्रीम बनाएं जो डिस्कनेक्ट, रिफ्रेश और क्रैश के बावजूद बनी रहे

<पी> अच्छा विचार:बदली जाने योग्य, बाधित स्ट्रीम कनेक्शन: <पी> ऐसी लचीली एलएलएम स्ट्रीम बनाएं जो डिस्कनेक्ट, रिफ्रेश और क्रैश के बावजूद बनी रहे

<पी> और हाँ - यह आर्किटेक्चर एक साधारण एआई स्ट्रीम के लिए काफी जटिल लग सकता है। हालाँकि, जैसा कि आप अभी कोड में देखेंगे, यह कोड की केवल कुछ पंक्तियाँ हैं और इसे लागू करने में कुछ मिनट लगते हैं।

टिकाऊ स्ट्रीम सेट करना

<पी> एक अत्यंत विश्वसनीय एलएलएम स्ट्रीम सेटअप के तीन भाग होते हैं:

  • ग्राहक (फ्रंटएंड)
  • स्ट्रीम जनरेटर (एक एपीआई मार्ग)
  • धारा उपभोक्ता (एक एपीआई रूट भी)
<पी> क्लाइंट से सीधे जुड़े सभी कनेक्शन किसी भी समय बाधित या रोके जा सकते हैं। एलएलएम आउटपुट स्ट्रीम (स्ट्रीम जनरेटर) उत्पन्न करने के लिए जिम्मेदार तर्क का टुकड़ा एक स्वतंत्र एपीआई होना चाहिए जिसका क्लाइंट से कभी भी सक्रिय कनेक्शन नहीं होता है।

<पी> इसके बजाय, हम एक उपभोक्ता के माध्यम से क्लाइंट से जुड़ेंगे - जो सिर्फ रेडिस से डेटा पढ़ता है और अन्यथा बहुत "बेवकूफ" होता है। इसका एकमात्र उद्देश्य जनरेटर के आउटपुट को पढ़ना है और जब भी कोई ग्राहक इससे जुड़ता है तो वह सभी एलएलएम खंड प्रदान करना है जिन्हें ग्राहक ने अभी तक नहीं देखा है। बस इतना ही.

<पी> त्वरित सारांश - प्रत्येक भाग क्या करता है:
  • ग्राहक: स्ट्रीम जनरेटर को ट्रिगर करता है (लेकिन कभी भी खुला कनेक्शन नहीं रखता है) और वास्तविक समय स्ट्रीम प्रस्तुत करता है
  • स्ट्रीम जनरेटर: वास्तविक समय में एलएलएम आउटपुट उत्पन्न करता है और रेडिस
  • पर प्रकाशित करता है
  • स्ट्रीम उपभोक्ता: जनरेटर की स्ट्रीम को पढ़ता है और क्लाइंट को टुकड़ों को भेजता है
<पी> जनरेटर केवल एलएलएम स्ट्रीम को पढ़ने और उसे वास्तविक समय में रेडिस पर प्रकाशित करने के लिए जिम्मेदार है। हमें क्लाइंट से स्ट्रीम उपभोक्ता के लिए एक प्रतिस्थापन योग्य कनेक्शन मिलता है जिसे समाप्त किया जा सकता है, पुनः जोड़ा जा सकता है, आदि-स्ट्रीम जनरेटर को कुछ भी प्रभावित नहीं करता है।

कोड उदाहरण

<पी> इस अनुभाग में, हम कोड पर एक नज़र डालेंगे। सिद्धांतों को बहुत स्पष्ट करने के लिए, हम अंत में वास्तविक, पूर्ण उत्पादन कोड कार्यान्वयन को देखेंगे।

<पी> फिलहाल, यदि हम संपूर्ण कोड फ़ाइलों के बजाय मुख्य स्निपेट और उनके उद्देश्य पर नज़र डालें तो कोड को समझना बहुत आसान है।

1. ग्राहक

<पी> ग्राहक की केवल 3 जिम्मेदारियाँ हैं:

  • सत्र आईडी जनरेट करना
  • जनरेटर चालू करना
  • जेनरेशन स्ट्रीम प्रस्तुत करना
<पी> आइए प्रत्येक को देखें:

क्लाइंट:सत्र आईडी जनरेट करना

<पी> जब कोई क्लाइंट किसी स्ट्रीम से जुड़ता है या दोबारा जुड़ता है, तो हम वे सभी संदेश भेजना चाहते हैं जो क्लाइंट ने अभी तक नहीं देखे हैं। इसका मतलब है कि एक सक्रिय स्ट्रीम के दौरान, प्रत्येक संदेश में केवल वही सटीक डेल्टा होता है जिसे क्लाइंट को देखना चाहिए, न कि संपूर्ण स्ट्रीम।

<पी> पुन:कनेक्ट करते समय, वर्तमान पीढ़ी बिंदु तक की पूरी स्ट्रीम भेज दी जाती है और भविष्य की सभी घटनाओं की सदस्यता बिना किसी छूटे हिस्से के बिल्कुल निर्बाध होती है।

<पी> कैसे? <पी> रेडिस स्ट्रीम, वास्तविक समय के डेटा को कुशलतापूर्वक संग्रहीत करने और पुनर्प्राप्त करने का एक तरीका है, इसमें उपभोक्ता समूहों के माध्यम से अभी तक नहीं देखी गई कार्यक्षमता अंतर्निहित है। केवल एक चीज जो हमें करने की ज़रूरत है:सुनिश्चित करें कि प्रत्येक ग्राहक के पास एक अद्वितीय सत्र हो - जिसका अर्थ है कि हम प्रत्येक पीढ़ी को एक अद्वितीय आईडी प्रदान करते हैं।

<पी> स्ट्रीम उपभोक्ता को देखते समय हम उपभोक्ता समूहों के बारे में अधिक जानेंगे। वे इस तरह दिखते हैं:

await redis.xgroup("redis-key", {
 type: "CREATE",
 group: "my-group-name",
 id: "0",
});
<पी> किस क्लाइंट ने किस स्ट्रीम को किस बिंदु तक देखा है और कौन से टुकड़े गायब हैं, इसका पूरा तर्क पूरी तरह से रेडिस स्ट्रीम द्वारा नियंत्रित किया जाता है गारंटीशुदा परिशुद्धता के साथ। हमें कभी भी खोया हुआ एलएलएम खंड नहीं मिलता है और हम हमेशा वही डेटा भेजते हैं जिसकी ग्राहक को आवश्यकता होती है।

<पी> ग्राहक को अभी केवल एक ही काम करना है:प्रत्येक पीढ़ी के लिए एक आईडी निर्दिष्ट करना। हम बस nanoid का उपयोग करते हैं :

import { customAlphabet } from "nanoid"
 
const nanoid = customAlphabet("0123456789", 6);

क्लाइंट:जनरेशन स्ट्रीम को ट्रिगर करना

<पी> जनरेशन इंजन के साथ क्लाइंट का अब तक का एकमात्र इंटरेक्शन इसे ट्रिगर करना है। हालाँकि, तकनीकी रूप से, आप कहीं और से भी पीढ़ी को ट्रिगर कर सकते हैं (उदाहरण के लिए CRON नौकरियां, स्वचालित पाइपलाइन)।

<पी> अपने सरलतम रूप में, यह जेनरेशन एपीआई रूट के लिए केवल एक फ़ेच कॉल है:

// 👇 trigger stream generator
await fetch("/api/llm-stream", {
 method: "POST",
 headers: {
 "Content-Type": "application/json",
 },
 body: JSON.stringify({ prompt, sessionId }),
});

क्लाइंट:जनरेशन स्ट्रीम पढ़ना

<पी> पीढ़ी को ट्रिगर करने के बाद, जनरेटर एलएलएम आउटपुट को एक केंद्रीकृत रेडिस स्टोर में स्ट्रीम करना शुरू कर देता है-क्लाइंट से पूरी तरह से अलग। आइए जनरेशन स्ट्रीम को पढ़ने के लिए स्ट्रीम उपभोक्ता से जुड़ें:

// 👇 connect to stream consumer
const res = await fetch(`/api/check-stream?sessionId=${sessionId}`, {
 headers: { "Content-Type": "text/event-stream" },
});
<पी> बस इतना ही!

<पी> वे ग्राहक की तीन जिम्मेदारियाँ हैं। बेशक, हम आईडी जनरेशन के लिए कस्टम हुक, अतिरिक्त विश्वसनीयता के लिए रिएक्ट-क्वेरी और बहुत कुछ के साथ बहुत अधिक प्रशंसक बन सकते हैं - हम इसे बाद में संपूर्ण कोड उदाहरणों में प्राप्त करेंगे।

2. स्ट्रीम जेनरेटर

<पी> स्ट्रीम जनरेटर एक एलएलएम स्ट्रीम खोलता है और प्रत्येक खंड को रेडिस स्ट्रीम में लिखता है। यह वास्तविक समय के अपडेट के लिए नए डेटा के बारे में स्ट्रीम उपभोक्ता को सचेत करने के लिए लिखे गए प्रत्येक खंड के लिए एक संदेश प्रकाशित करता है।

<पी> नोट:फिर, यह जानबूझकर एक पूर्ण कोड उदाहरण नहीं है। हम अंत में पूर्ण कोड प्राप्त करेंगे-यह अवधारणा को समझने के लिए है।

import { streamText } from "ai"
import { redis } from "@/utils"
 
const result = await new Promise(
 async (resolve, reject) => {
 const { textStream } = streamText({
 model: openai("gpt-4o"),
 prompt,
 onError: (err) => reject(err),
 onFinish: async () => {
 resolve({
 // ...
 }),
 })
 
 for await (const chunk of textStream) {
 if (chunk) {
 const chunkMessage: ChunkMessage = {
 type: MessageType.CHUNK,
 content: chunk,
 }
 
 // 👇 write chunk to redis stream
 await redis.xadd(streamKey, "*", chunkMessage)
 
 // 👇 alert consumer that there's a new chunk
 await redis.publish(streamKey, { type: MessageType.CHUNK })
 }
 }
 }
)

3. स्ट्रीम उपभोक्ता

<पी> स्ट्रीम उपभोक्ता रेडिस से जुड़ता है और रेडिस पब/सब के माध्यम से नए चंक अलर्ट सुनता है। प्रत्येक ग्राहक को अपने देखे और अनदेखे संदेशों को ट्रैक करने के लिए अपना स्वयं का उपभोक्ता समूह मिलता है।

<पी> ध्यान दें:प्रकाशन वास्तविक खंड को स्थानांतरित नहीं करता है, यह केवल सचेत करता है कि स्ट्रीम में एक नया खंड उपलब्ध है।

<पी> जब कोई नया हिस्सा उपलब्ध होता है, तो स्ट्रीम उपभोक्ता एपीआई इसे स्ट्रीम से पढ़ता है और सभी कनेक्टेड क्लाइंट्स को अग्रेषित करता है। रेडिस उपभोक्ता समूह इस बात पर नज़र रखते हैं कि प्रत्येक ग्राहक ने क्या देखा है ताकि यह गारंटी दी जा सके कि कोई डुप्लिकेट या गुम हुए टुकड़े स्थानांतरित न हों।

<पी> कोर स्ट्रीम उपभोक्ता इस तरह दिखता है:

const streamKey = `llm:stream:${sessionId}`;
const groupName = `sse-group-${nanoid()}`;
 
await redis.xgroup(streamKey, {
 type: "CREATE",
 group: groupName,
 id: "0",
});
 
const readStreamMessages = async () => {
 const chunks = (await redis.xreadgroup(
 groupName,
 `consumer-1`,
 streamKey,
 
 // 👇 built-in Redis stream functionality: only send unseen messages
 ">",
 )) as StreamData[];
 
 if (chunks?.length > 0) {
 const [_streamKey, messages] = chunks[0];
 for (const [_messageId, fields] of messages) {
 const rawObj = arrToObj(fields);
 const validatedMessage = validateMessage(rawObj);
 
 if (validatedMessage) {
 controller.enqueue(json(validatedMessage));
 }
 }
 }
};
 
// 👇 initial read
await readStreamMessages();
 
const subscription = redis.subscribe(streamKey);
 
subscription.on("message", async () => {
 // 👇 read every time a new chunk is written to stream
 await readStreamMessages();
});
<पी> नोट:हम प्रत्येक कनेक्शन पर एक उपभोक्ता समूह बना रहे हैं। यह इतनी अच्छी तरह से काम करता है क्योंकि रेडिस इस ऑपरेशन को निष्क्रियता से संभालता है, उदाहरण के लिए। यदि समूह पहले से मौजूद है तो कुछ नहीं होगा।

पूर्ण कोड

SessionID जनरेशन

<पी> अब तक, हमने ग्राहकों के कार्यों, स्ट्रीम जनरेटर और स्ट्रीम उपभोक्ता को व्यक्तिगत रूप से समझने के लिए कोड के अलग-अलग टुकड़ों को देखा है। अब, आइए पूर्ण कार्यान्वयन को देखकर देखें कि ये टुकड़े एक साथ कैसे फिट होते हैं।

<पी> आरंभ करने के लिए, केवल nanoid() का उपयोग करने की तुलना में sessionId बनाना अधिक लचीला होना चाहिए . आख़िरकार, यदि वेबसाइट ताज़ा हो जाए या बंद हो जाए तो क्या होगा? पुन:कनेक्ट करने पर, यदि हम इसे कहीं संग्रहीत नहीं करते हैं तो हम जेनरेट की गई सेशन आईडी खो देंगे - इसे तब तक बनाए रखने की आवश्यकता है जब तक पीढ़ी चलती है।

<पी> सौभाग्य से localStorage इसके लिए बिल्कुल उपयुक्त है:

import { customAlphabet } from "nanoid";
import { useRouter } from "next/navigation";
import { useCallback, useEffect, useState } from "react";
 
export const useLLMSession = () => {
 const [sessionId, setSessionId] = useState<string>("");
 const router = useRouter();
 const nanoid = customAlphabet("0123456789", 6);
 
 const updateUrlWithSessionId = useCallback(
 (id: string) => {
 const url = new URL(window.location.href);
 url.searchParams.set("sessionId", id);
 router.replace(url.toString(), { scroll: false });
 },
 [router],
 );
 
 useEffect(() => {
 const urlParams = new URLSearchParams(window.location.search);
 const urlSessionId = urlParams.get("sessionId");
 const storedSessionId = localStorage.getItem("llm-session-id");
 
 if (urlSessionId) {
 localStorage.setItem("llm-session-id", urlSessionId);
 setSessionId(urlSessionId);
 } else if (storedSessionId) {
 setSessionId(storedSessionId);
 updateUrlWithSessionId(storedSessionId);
 } else {
 const newSessionId = nanoid();
 localStorage.setItem("llm-session-id", newSessionId);
 setSessionId(newSessionId);
 updateUrlWithSessionId(newSessionId);
 }
 // eslint-disable-next-line react-hooks/exhaustive-deps
 }, []);
 
 const clearSessionId = useCallback(() => {
 localStorage.removeItem("llm-session-id");
 setSessionId("");
 const url = new URL(window.location.href);
 url.searchParams.delete("sessionId");
 router.replace(url.toString(), { scroll: false });
 }, [router]);
 
 const regenerateSessionId = () => {
 const newSessionId = nanoid();
 localStorage.setItem("llm-session-id", newSessionId);
 setSessionId(newSessionId);
 updateUrlWithSessionId(newSessionId);
 return newSessionId;
 };
 
 return {
 sessionId,
 regenerateSessionId,
 clearSessionId,
 };
};

ग्राहक

<पी> हमने क्लाइंट के दो सबसे महत्वपूर्ण भाग पहले ही देख लिए हैं:एक स्ट्रीम शुरू करना और एक स्ट्रीम से कनेक्ट करना। एक बार जब हमें अपने एपीआई से पुष्टि मिल जाती है कि जनरेटर चल रहा है, तो हम प्रतिक्रिया-क्वेरी refetch का उपयोग करके स्ट्रीम से जुड़ते हैं। हमारी कनेक्शन क्वेरी शुरू करने के लिए उपयोगिता।

<पी> यहां बताया गया है कि सभी टुकड़े एक साथ कैसे फिट होते हैं:

ऐप/पेज.tsx
"use client"
 
import { useLLMSession } from "@/use-llm-session"
import { useMutation, useQuery } from "@tanstack/react-query"
import { FormEvent, useRef, useState, useEffect } from "react"
import {
 MessageType,
 validateMessage,
 type ChunkMessage,
 type MetadataMessage,
 StreamStatus,
} from "@/lib/message-schema"
 
// precondition = stream is ready to read
class PreconditionFailedError extends Error {
 constructor(message: string) {
 super(message)
 this.name = "PreconditionFailedError"
 }
}
 
export default function Home() {
 const { sessionId, regenerateSessionId, clearSessionId } = useLLMSession()
 
 const [prompt, setPrompt] = useState("")
 const [status, setStatus] = useState<
 "idle" | "loading" | "streaming" | "completed" | "error"
 >("idle")
 const [response, setResponse] = useState("")
 const [chunkCount, setChunkCount] = useState(0)
 
 const controller = useRef<AbortController | null>(null)
 const responseRef = useRef<HTMLDivElement>(null)
 const isInitialRequest = useRef(true)
 
 // keep generation in viewport
 useEffect(() => {
 if (responseRef.current) {
 responseRef.current.scrollTop = responseRef.current.scrollHeight
 }
 }, [response])
 
 // start generator
 const { mutate, error, isIdle } = useMutation({
 mutationFn: async (newSessionId: string) => {
 controller.current?.abort()
 isInitialRequest.current = false
 
 await fetch("/api/llm-stream", {
 method: "POST",
 headers: {
 "Content-Type": "application/json",
 },
 body: JSON.stringify({ prompt, sessionId: newSessionId }),
 })
 },
 onSuccess: () => {
 setStatus("streaming")
 refetch()
 },
 })
 
 // connect to running stream
 const { refetch } = useQuery({
 queryKey: ["stream", sessionId],
 queryFn: async () => {
 if (!sessionId) return null
 
 setResponse("")
 setChunkCount(0)
 
 const abortController = new AbortController()
 controller.current = abortController
 
 const res = await fetch(`/api/check-stream?sessionId=${sessionId}`, {
 headers: { "Content-Type": "text/event-stream" },
 signal: controller.current.signal,
 })
 
 if (res.status === 412) {
 // stream is not yet ready, retry connection
 throw new PreconditionFailedError("Stream not ready yet")
 }
 
 if (!res.body) return null
 
 const reader = res.body.pipeThrough(new TextDecoderStream()).getReader()
 
 let streamContent = ""
 
 while (true) {
 const { value, done } = await reader.read()
 
 if (done) break
 
 if (value) {
 const messages = value.split("\n\n").filter(Boolean)
 
 for (const message of messages) {
 if (message.startsWith("data: ")) {
 const data = message.slice(6)
 try {
 const parsedData = JSON.parse(data)
 const validatedMessage = validateMessage(parsedData)
 
 if (!validatedMessage) continue
 
 switch (validatedMessage.type) {
 case MessageType.CHUNK:
 const chunkMessage = validatedMessage as ChunkMessage
 streamContent += chunkMessage.content
 setResponse((prev) => prev + chunkMessage.content)
 setChunkCount((prev) => prev + 1)
 break
 
 case MessageType.METADATA:
 const metadataMessage = validatedMessage as MetadataMessage
 
 if (metadataMessage.status === StreamStatus.COMPLETED) {
 setStatus("completed")
 }
 break
 
 case MessageType.ERROR:
 setStatus("error")
 break
 }
 } catch (e) {
 console.error("Failed to parse message:", e)
 }
 }
 }
 }
 }
 
 return streamContent
 },
 refetchOnWindowFocus: false,
 refetchOnMount: false,
 retry(failureCount, error) {
 if (isInitialRequest.current === true) return false
 
 if (error instanceof PreconditionFailedError) {
 return failureCount < 10
 }
 
 return false
 },
 })
 
 const handleSubmit = async (e: FormEvent) => {
 e.preventDefault()
 setStatus("loading")
 const newSessionId = regenerateSessionId()
 mutate(newSessionId)
 }
 
 const handleReset = () => {
 controller.current?.abort()
 clearSessionId()
 setPrompt("")
 setResponse("")
 setChunkCount(0)
 setStatus("idle")
 }
 
 return (
 <main className="flex min-h-screen flex-col items-center justify-between p-12 sm:p-24">
 <div className="z-10 max-w-5xl w-full items-center justify-between font-mono text-sm">
 <h1 className="text-4xl tracking-tight font-bold mb-8 text-center">
 Resumable LLM Stream
 </h1>
 
 <form onSubmit={handleSubmit} className="mb-8">
 <div className="mb-4">
 <label htmlFor="prompt" className="block text-sm font-medium mb-2">
 Enter your prompt:
 </label>
 <textarea
 autoFocus
 id="prompt"
 value={prompt}
 onChange={(e) => setPrompt(e.target.value)}
 className="w-full p-2 border border-zinc-700 rounded-md min-h-[100px] focus:outline-none focus:ring-2 focus:ring-blue-500 focus:border-transparent transition-all duration-200"
 placeholder="Ask the AI something..."
 disabled={status === "loading" || status === "streaming"}
 />
 </div>
 
 <div className="flex gap-4">
 <button
 type="submit"
 disabled={status === "loading" || status === "streaming"}
 className="px-4 py-2 bg-blue-600 text-white rounded-md hover:bg-blue-700 disabled:bg-gray-400"
 >
 {status === "loading"
 ? "Starting..."
 : status === "streaming"
 ? "Streaming..."
 : "Generate Response"}
 </button>
 <button
 type="button"
 onClick={handleReset}
 className="px-4 py-2 bg-zinc-600 text-white rounded-md hover:bg-zinc-700"
 >
 Reset
 </button>
 </div>
 </form>
 
 <div className="mt-8">
 <h2 className="text-xl tracking-tight font-semibold mb-2">
 Response:
 </h2>
 {status === "error" ? (
 <div className="p-4 bg-red-100 border border-red-300 rounded-md text-red-800">
 <p className="font-bold">Error:</p>
 <p>{error?.message}</p>
 </div>
 ) : status === "idle" && !response ? (
 <p className="text-gray-500">
 Enter a prompt and click "Generate Response" to see the AI's
 response.
 </p>
 ) : (
 <div
 ref={responseRef}
 className="flex flex-col h-96 overflow-y-auto p-4 bg-zinc-900 text-zinc-200 border border-zinc-800 rounded-md whitespace-pre-wrap [&::-webkit-scrollbar]:w-2 [&::-webkit-scrollbar-thumb]:bg-zinc-700 [&::-webkit-scrollbar-track]:bg-zinc-800"
 >
 <div>{response || "Loading..."}</div>
 </div>
 )}
 
 {(status === "streaming" || status === "completed") && (
 <div className="mt-2 text-sm text-gray-500">
 <p>Session ID: {sessionId}</p>
 <p>Status: {status}</p>
 <p>Chunks received: {chunkCount}</p>
 <p>
 Connection: {status === "streaming" ? "Active SSE" : "Closed"}
 </p>
 </div>
 )}
 </div>
 </div>
 </main>
 )
}

स्ट्रीम जेनरेटर

<पी> यहां स्ट्रीम जनरेटर के लिए संपूर्ण कोड है। यदि एलएलएम पीढ़ी किसी भी बिंदु पर विफल हो जाती है, तो अधिकतम विश्वसनीयता के लिए अपस्टैश वर्कफ़्लो का उपयोग करके इसे स्वचालित रूप से पुनः प्रयास किया जाता है:

api/llm-stream/route.ts
import {
 MessageType,
 StreamStatus,
 type ChunkMessage,
 type MetadataMessage,
} from "@/lib/message-schema";
import { redis } from "@/utils";
import { openai } from "@ai-sdk/openai";
import { serve } from "@upstash/workflow/nextjs";
import { streamText } from "ai";
 
interface LLMStreamResponse {
 success: boolean;
 sessionId: string;
 totalChunks: number;
 fullContent: string;
}
 
export const { POST } = serve(async (context) => {
 const { prompt, sessionId } = context.requestPayload as {
 prompt?: string;
 sessionId?: string;
 };
 
 if (!prompt || !sessionId) {
 throw new Error("Prompt and sessionId are required");
 }
 
 const streamKey = `llm:stream:${sessionId}`;
 
 await context.run("mark-stream-start", async () => {
 const metadataMessage: MetadataMessage = {
 type: MessageType.METADATA,
 status: StreamStatus.STARTED,
 completedAt: new Date().toISOString(),
 totalChunks: 0,
 fullContent: "",
 };
 
 await redis.xadd(streamKey, "*", metadataMessage);
 await redis.publish(streamKey, { type: MessageType.METADATA });
 });
 
 const res = await context.run("generate-llm-response", async () => {
 const result = await new Promise<LLMStreamResponse>(
 async (resolve, reject) => {
 let fullContent = "";
 let chunkIndex = 0;
 
 const { textStream } = streamText({
 model: openai("gpt-4o"),
 prompt,
 onError: (err) => reject(err),
 onFinish: async () => {
 resolve({
 success: true,
 sessionId,
 totalChunks: chunkIndex,
 fullContent,
 });
 },
 });
 
 for await (const chunk of textStream) {
 if (chunk) {
 fullContent += chunk;
 chunkIndex++;
 
 const chunkMessage: ChunkMessage = {
 type: MessageType.CHUNK,
 content: chunk,
 };
 
 await redis.xadd(streamKey, "*", chunkMessage);
 await redis.publish(streamKey, { type: MessageType.CHUNK });
 }
 }
 },
 );
 
 return result;
 });
 
 await context.run("mark-stream-end", async () => {
 const metadataMessage: MetadataMessage = {
 type: MessageType.METADATA,
 status: StreamStatus.COMPLETED,
 completedAt: new Date().toISOString(),
 totalChunks: res.totalChunks,
 fullContent: res.fullContent,
 };
 
 await redis.xadd(streamKey, "*", metadataMessage);
 await redis.publish(streamKey, { type: MessageType.METADATA });
 });
});
<पी> संपूर्ण प्रकार-सुरक्षा के लिए, मैंने सभी संदेश स्कीमा को zod:

में भी लिखा है message-schema.ts
import { z } from "zod";
 
export const MessageType = {
 CHUNK: "chunk",
 METADATA: "metadata",
 EVENT: "event",
 ERROR: "error",
} as const;
 
export const StreamStatus = {
 STARTED: "started",
 STREAMING: "streaming",
 COMPLETED: "completed",
 ERROR: "error",
} as const;
 
export const baseMessageSchema = z.object({
 type: z.enum([
 MessageType.CHUNK,
 MessageType.METADATA,
 MessageType.EVENT,
 MessageType.ERROR,
 ]),
});
 
export const chunkMessageSchema = baseMessageSchema.extend({
 type: z.literal(MessageType.CHUNK),
 content: z.string(),
});
 
export const metadataMessageSchema = baseMessageSchema.extend({
 type: z.literal(MessageType.METADATA),
 status: z.enum([
 StreamStatus.STARTED,
 StreamStatus.STREAMING,
 StreamStatus.COMPLETED,
 StreamStatus.ERROR,
 ]),
 completedAt: z.string().optional(),
 totalChunks: z.number().optional(),
 fullContent: z.string().optional(),
 error: z.string().optional(),
});
 
export const eventMessageSchema = baseMessageSchema.extend({
 type: z.literal(MessageType.EVENT),
});
 
export const errorMessageSchema = baseMessageSchema.extend({
 type: z.literal(MessageType.ERROR),
 error: z.string(),
});
 
export const messageSchema = z.discriminatedUnion("type", [
 chunkMessageSchema,
 metadataMessageSchema,
 eventMessageSchema,
 errorMessageSchema,
]);
 
export type Message = z.infer<typeof messageSchema>;
export type ChunkMessage = z.infer<typeof chunkMessageSchema>;
export type MetadataMessage = z.infer<typeof metadataMessageSchema>;
export type EventMessage = z.infer<typeof eventMessageSchema>;
export type ErrorMessage = z.infer<typeof errorMessageSchema>;
 
export const validateMessage = (data: unknown): Message | null => {
 const result = messageSchema.safeParse(data);
 return result.success ? result.data : null;
};

स्ट्रीम उपभोक्ता

<पी> अंत में, आइए पूर्ण स्ट्रीम उपभोक्ता कार्यान्वयन पर एक नज़र डालें। यह प्रतिस्थापन योग्य कनेक्शन है जो क्लाइंट के कनेक्ट होने पर सभी अनदेखे हिस्सों को स्वचालित रूप से भेजता है:

api/check-stream/route.ts
import { redis } from "@/utils"
import { nanoid } from "nanoid"
import { NextRequest, NextResponse } from "next/server"
import {
 validateMessage,
 MessageType,
 type ErrorMessage,
} from "@/lib/message-schema"
 
export const dynamic = "force-dynamic"
export const maxDuration = 60
export const runtime = "nodejs"
 
type StreamField = string
type StreamMessage = [string, StreamField[]]
type StreamData = [string, StreamMessage[]]
 
const arrToObj = (arr: StreamField[]) => {
 const obj: Record<string, string> = {}
 
 for (let i = 0; i < arr.length; i += 2) {
 obj[arr[i]] = arr[i + 1]
 }
 
 return obj
}
 
const json = (data: Record<string, unknown>) => {
 return new TextEncoder().encode(`data: ${JSON.stringify(data)}\n\n`)
}
 
export async function GET(req: NextRequest) {
 const { searchParams } = new URL(req.url)
 const sessionId = searchParams.get("sessionId")
 
 if (!sessionId) {
 return NextResponse.json(
 { error: "Stream key is required" },
 { status: 400 }
 )
 }
 
 const streamKey = `llm:stream:${sessionId}`
 const groupName = `sse-group-${nanoid()}`
 
 const keyExists = await redis.exists(streamKey)
 
 if (!keyExists) {
 return NextResponse.json(
 { error: "Stream does not (yet) exist" },
 { status: 412 }
 )
 }
 
 try {
 await redis.xgroup(streamKey, {
 type: "CREATE",
 group: groupName,
 id: "0",
 })
 } catch (_err) {}
 
 const response = new Response(
 new ReadableStream({
 async start(controller) {
 const readStreamMessages = async () => {
 const chunks = (await redis.xreadgroup(
 groupName,
 `consumer-1`,
 streamKey,
 ">"
 )) as StreamData[]
 
 if (chunks?.length > 0) {
 const [_streamKey, messages] = chunks[0]
 for (const [_messageId, fields] of messages) {
 const rawObj = arrToObj(fields)
 const validatedMessage = validateMessage(rawObj)
 
 if (validatedMessage) {
 controller.enqueue(json(validatedMessage))
 }
 }
 }
 }
 
 await readStreamMessages()
 
 const subscription = redis.subscribe(streamKey)
 
 subscription.on("message", async () => {
 await readStreamMessages()
 })
 
 subscription.on("error", (error) => {
 console.error(`SSE subscription error on ${streamKey}:`, error)
 
 const errorMessage: ErrorMessage = {
 type: MessageType.ERROR,
 error: error.message,
 }
 
 controller.enqueue(json(errorMessage))
 controller.close()
 })
 
 req.signal.addEventListener("abort", () => {
 console.log("Client disconnected, cleaning up subscription")
 subscription.unsubscribe()
 controller.close()
 })
 },
 }),
 {
 headers: {
 "Content-Type": "text/event-stream",
 "Cache-Control": "no-cache, no-transform",
 Connection: "keep-alive",
 },
 }
 )
 
 return response
}

त्वरित सारांश और अंतिम शब्द

<पी> हमने अभी-अभी एक बेहद मजबूत एलएलएम स्ट्रीम बनाई है जो नेटवर्क रुकावटों, पेज रिफ्रेश और यहां तक कि पूर्ण डिस्कनेक्शन को भी संभाल सकती है। हमने यह किया:

  • <पी> डिलीवरी से वियुग्मित पीढ़ी: एलएलएम जेनरेशन को क्लाइंट कनेक्शन से अलग करके, क्लाइंट की समस्याओं की परवाह किए बिना कंटेंट जेनरेशन जारी रहता है।

  • <पी> रेडिस स्ट्रीम का उपयोग करके लगातार भंडारण: हम एलएलएम प्रतिक्रिया के प्रत्येक भाग को उत्पन्न होने पर संग्रहीत करने के लिए एक सतत संदेश ब्रोकर के रूप में रेडिस स्ट्रीम का उपयोग कर रहे हैं।

  • <पी> रेडिस पब/सब के साथ वास्तविक समय अपडेट: हमने नए खंड उपलब्ध होने पर स्ट्रीम उपभोक्ताओं को सूचित करने के लिए रेडिस पब/सब का उपयोग करके एक अधिसूचना प्रणाली बनाई है।

  • <पी> स्वचालित पुनः कनेक्शन: क्लाइंट किसी भी समय पुनः कनेक्ट हो सकता है और स्वचालित रूप से सभी सामग्री प्राप्त कर सकता है, डुप्लिकेट या गायब खंडों के बिना इसकी गारंटी है। इसमें डिस्कनेक्ट के दौरान उत्पन्न सामग्री शामिल है।

  • <पी> सत्र प्रबंधन: हमने एक सत्र प्रणाली बनाई है जो उपयोगकर्ताओं को एक ही समय में कई उपकरणों पर स्ट्रीम देखने की अनुमति देती है।

<पी> मूल बात यह है कि अब हम अपने उपयोगकर्ताओं को एक असाधारण उपयोगकर्ता अनुभव (यूएक्स) प्रदान कर रहे हैं। मैं वास्तव में इस दृष्टिकोण की अनुशंसा करता हूं, खासकर यदि आप एलएलएम चैट सेवा जैसा कुछ बना रहे हैं।

<पी> पढ़ने के लिए शुभकामनाएँ! यदि आपके पास कोई प्रतिक्रिया है या आप अपस्टैश पर अतिथि लेखक बनना चाहते हैं, तो josh@upstash.com पर संपर्क करें 🙌


  1. HTML DOM स्थान प्रोटोकॉल संपत्ति HTML DOM स्थान प्रोटोकॉल संपत्ति

    स्थान प्रोटोकॉल गुण URL के लिए प्रयुक्त प्रोटोकॉल के अनुरूप स्ट्रिंग लौटाता/सेट करता है। प्रोटोकॉल को फ़ाइल:, http:, https:, आदि पर सेट किया जा सकता है। सिंटैक्स निम्नलिखित वाक्य रचना है - प्रोटोकॉल . का रिटर्निंग मान संपत्ति स्थान.प्रोटोकॉल प्रोटोकॉल का महत्व संपत्ति सेट स्थान.प्रोटोकॉल =प्रोट

  1. एचटीएमएल बनाम एक्सएमएल एचटीएमएल बनाम एक्सएमएल

    एचटीएमएल HTML हाइपरटेक्स्ट मार्कअप लैंग्वेज के लिए है जो एक वेब पेज की संरचना का वर्णन करने के लिए उपयोग की जाने वाली भाषा है। इसमें विभिन्न HTML तत्व होते हैं जो HTML टैग्स और उनकी सामग्री से बने होते हैं। HTML एक हाइपरटेक्स्ट भाषा है जिससे हम दस्तावेज़ों के लिंक की एक श्रृंखला बना सकते हैं। HTML

  1. एंड्रॉइड में अधिसूचना कैसे शेड्यूल करें? एंड्रॉइड में अधिसूचना कैसे शेड्यूल करें?

    यह उदाहरण एंड्रॉइड में अधिसूचना कैसे शेड्यूल करें के बारे में प्रदर्शित करता है। चरण 1 - एंड्रॉइड स्टूडियो में एक नया प्रोजेक्ट बनाएं, फाइल ⇒ न्यू प्रोजेक्ट पर जाएं और एक नया प्रोजेक्ट बनाने के लिए सभी आवश्यक विवरण भरें। चरण 2 - निम्न कोड को res/layout/activity_main.xml में जोड़ें। चरण 3 - निम