<पी> स्टीफ़न सानवो द्वारा <पी> एक कार्यशील पूर्ण-स्टैक एप्लिकेशन बनाने के लिए, सोचने के लिए बहुत सारे गतिशील भागों की आवश्यकता होती है। और आपको कई निर्णय लेने होंगे जो आपके ऐप की सफलता के लिए महत्वपूर्ण होंगे। <पी> उदाहरण के लिए, आप किस भाषा का उपयोग करेंगे और किस प्लेटफ़ॉर्म पर तैनात होंगे? क्या आप किसी सर्वर पर कंटेनरीकृत सॉफ़्टवेयर तैनात करने जा रहे हैं, या बैकएंड को संभालने के लिए सर्वर रहित फ़ंक्शंस का उपयोग करने जा रहे हैं? क्या आप प्रमाणीकरण या भुगतान जैसे अपने एप्लिकेशन के जटिल हिस्सों को संभालने के लिए तृतीय-पक्ष एपीआई का उपयोग करने की योजना बना रहे हैं? आप डेटा कहाँ संग्रहीत करते हैं? पी> <पी> इन सबके अलावा, आपको अपने एप्लिकेशन के उपयोगकर्ता इंटरफ़ेस, डिज़ाइन और उपयोगिता और बहुत कुछ के बारे में भी सोचना होगा। पी> <पी> यही कारण है कि जटिल बड़े अनुप्रयोगों के लिए ऐप बनाने के लिए एक बहुक्रियाशील विकास टीम की आवश्यकता होती है। <पी> पूर्ण स्टैक एप्लिकेशन विकसित करने का तरीका सीखने का सबसे अच्छा तरीका ऐसी परियोजनाएं बनाना है जो एंड-टू-एंड विकास प्रक्रिया को कवर करती हैं। आप आर्किटेक्चर को डिजाइन करने, एपीआई सेवाओं को विकसित करने, यूजर इंटरफेस विकसित करने और अंत में अपने एप्लिकेशन को तैनात करने से गुजरेंगे। पी> <पी> तो यह ट्यूटोरियल आपको इन अवधारणाओं को गहराई से सीखने में मदद करने के लिए एआई चैटबॉट बनाने की प्रक्रिया में ले जाएगा। <पी> हम जिन विषयों को कवर करेंगे उनमें शामिल हैं: - पायथन, फास्टएपीआई और वेबसॉकेट के साथ एपीआई कैसे बनाएं
- रेडिस के साथ रीयल-टाइम सिस्टम कैसे बनाएं
- रिएक्ट के साथ चैट यूजर इंटरफेस कैसे बनाएं
<पी> महत्वपूर्ण नोट: यह एक मध्यवर्ती पूर्ण स्टैक सॉफ़्टवेयर विकास परियोजना है जिसके लिए कुछ बुनियादी पायथन और जावास्क्रिप्ट ज्ञान की आवश्यकता होती है। पी> <पी> मैंने यह सुनिश्चित करने के लिए प्रोजेक्ट को सावधानीपूर्वक खंडों में विभाजित किया है कि यदि आप पूर्ण एप्लिकेशन को कोड नहीं करना चाहते हैं तो आप आसानी से उस चरण का चयन कर सकते हैं जो आपके लिए महत्वपूर्ण है। <पी> आप यहां My Github पर संपूर्ण रिपॉजिटरी डाउनलोड कर सकते हैं। सामग्री तालिका
धारा 1
- एप्लिकेशन आर्किटेक्चर
- विकास परिवेश कैसे स्थापित करें
धारा 2
- पायथन, फास्टएपीआई और वेबसॉकेट के साथ चैट सर्वर कैसे बनाएं
- पायथन पर्यावरण कैसे स्थापित करें
- FastAPI सर्वर सेटअप
- एपीआई में रूट कैसे जोड़ें
- UUID के साथ चैट सेशन टोकन कैसे जेनरेट करें
- पोस्टमैन के साथ एपीआई का परीक्षण कैसे करें
- वेबसॉकेट और कनेक्शन प्रबंधक
- FastAPI में निर्भरता इंजेक्शन
धारा 3
- रेडिस के साथ रीयल-टाइम सिस्टम कैसे बनाएं
- रेडिस और वितरित मैसेजिंग कतारें
- रेडिस क्लाइंट के साथ पायथन में रेडिस क्लस्टर से कैसे जुड़ें
- रेडिस स्ट्रीम्स के साथ कैसे काम करें
- चैट डेटा को मॉडल कैसे करें
- रेडिस JSON के साथ कैसे काम करें
- टोकन निर्भरता को कैसे अद्यतन करें
धारा 4
- एआई मॉडल के साथ चैटबॉट्स में इंटेलिजेंस कैसे जोड़ें
- हगिंगफेस से कैसे शुरुआत करें
- भाषा मॉडल के साथ कैसे इंटरैक्ट करें
- एआई मॉडल के लिए अल्पकालिक मेमोरी का अनुकरण कैसे करें
- संदेश कतार से उपभोक्ता और रीयल-टाइमडीडेटा स्ट्रीम स्ट्रीम करें
- चैट क्लाइंट को AI रिस्पॉन्स के साथ कैसे अपडेट करें
- टोकन ताज़ा करें
- पोस्टमैन में एकाधिक ग्राहकों के साथ चैट का परीक्षण कैसे करें
एप्लिकेशन आर्किटेक्चर
<पी> एक समाधान आर्किटेक्चर को स्केच करने से आपको अपने एप्लिकेशन का उच्च-स्तरीय अवलोकन मिलता है, जिन उपकरणों का आप उपयोग करना चाहते हैं, और घटक एक-दूसरे के साथ कैसे संचार करेंगे। पी> <पी> मैंने नीचे Draw.io का उपयोग करके एक सरल आर्किटेक्चर तैयार किया है: <पी>
फुलस्टैक चैटबॉट आर्किटेक्चर पी> <पी> आइए वास्तुकला के विभिन्न भागों पर अधिक विस्तार से विचार करें: क्लाइंट/यूजर इंटरफ़ेस
<पी> यूजर इंटरफेस बनाने के लिए हम रिएक्ट संस्करण 18 का उपयोग करेंगे। चैट यूआई वेबसॉकेट के माध्यम से बैकएंड के साथ संचार करेगा। GPT-J-6B और हगिंगफेस इन्फेरेंस एपीआई
<पी> GPT-J-6B एक जेनरेटिव भाषा मॉडल है जिसे 6 बिलियन मापदंडों के साथ प्रशिक्षित किया गया था और यह कुछ कार्यों पर OpenAI के GPT-3 के साथ मिलकर काम करता है। पी> <पी> मैंने GPT-J-6B का उपयोग करना चुना है क्योंकि यह एक ओपन-सोर्स मॉडल है और सरल उपयोग के मामलों के लिए भुगतान किए गए टोकन की आवश्यकता नहीं है। पी> <पी> हगिंगफेस हमें इस मॉडल से जुड़ने के लिए एक ऑन-डिमांड एपीआई भी निःशुल्क प्रदान करता है। आप GPT-J-6B और हगिंग फेस इंफ़रेंस एपीआई के बारे में अधिक पढ़ सकते हैं। रेडिस
<पी> जब हम GPT को संकेत भेजते हैं, तो हमें संकेतों को संग्रहीत करने और आसानी से प्रतिक्रिया प्राप्त करने का एक तरीका चाहिए। हम चैट डेटा को स्टोर करने के लिए रेडिस JSON का उपयोग करेंगे और हगिंगफेस इंट्रेंस एपीआई के साथ वास्तविक समय संचार को संभालने के लिए रेडिस स्ट्रीम का भी उपयोग करेंगे। पी> <पी> रेडिस एक इन-मेमोरी की-वैल्यू स्टोर है जो JSON-जैसे डेटा को सुपर-फास्ट लाने और संग्रहीत करने में सक्षम बनाता है। इस ट्यूटोरियल के लिए, हम परीक्षण उद्देश्यों के लिए रेडिस एंटरप्राइज द्वारा प्रदान किए गए प्रबंधित निःशुल्क रेडिस स्टोरेज का उपयोग करेंगे। वेब सॉकेट और चैट एपीआई
<पी> क्लाइंट और सर्वर के बीच वास्तविक समय में संदेश भेजने के लिए, हमें एक सॉकेट कनेक्शन खोलने की आवश्यकता है। ऐसा इसलिए है क्योंकि क्लाइंट और सर्वर के बीच वास्तविक समय द्वि-दिशात्मक संचार सुनिश्चित करने के लिए HTTP कनेक्शन पर्याप्त नहीं होगा। पी> <पी> हम चैट सर्वर के लिए फास्टएपीआई का उपयोग करेंगे, क्योंकि यह हमारे उपयोग के लिए एक तेज़ और आधुनिक पायथन सर्वर प्रदान करता है। WebSockets के बारे में अधिक जानने के लिए (FastAPI दस्तावेज़) देखें। विकास परिवेश कैसे स्थापित करें
<पी> इस ऐप को बनाने के लिए आप अपने इच्छित ओएस का उपयोग कर सकते हैं - मैं वर्तमान में मैकओएस और विजुअल स्टूडियो कोड का उपयोग कर रहा हूं। बस सुनिश्चित करें कि आपके पास Python और NodeJs स्थापित हैं। पी> <पी> प्रोजेक्ट संरचना स्थापित करने के लिए, fullstack-ai-chatbot नाम का एक फ़ोल्डर बनाएं . फिर प्रोजेक्ट के भीतर client नामक दो फ़ोल्डर बनाएं और server . सर्वर बैकएंड के लिए कोड रखेगा, जबकि क्लाइंट फ्रंटएंड के लिए कोड रखेगा। <पी> इसके बाद प्रोजेक्ट निर्देशिका के भीतर, "git init" कमांड का उपयोग करके प्रोजेक्ट फ़ोल्डर के रूट के भीतर एक Git रिपॉजिटरी आरंभ करें। फिर "टच .gitignore" का उपयोग करके एक .gitignore फ़ाइल बनाएं: git init
touch .gitignore
<पी> अगले भाग में, हम FastAPI और Python का उपयोग करके अपना चैट वेब सर्वर बनाएंगे। पायथन, फास्टएपीआई और वेबसॉकेट के साथ चैट सर्वर कैसे बनाएं
<पी> इस अनुभाग में, हम उपयोगकर्ता के साथ संचार करने के लिए फास्टएपीआई का उपयोग करके चैट सर्वर का निर्माण करेंगे। हम क्लाइंट और सर्वर के बीच द्वि-दिशात्मक संचार सुनिश्चित करने के लिए वेबसॉकेट का उपयोग करेंगे ताकि हम वास्तविक समय में उपयोगकर्ता को प्रतिक्रिया भेज सकें। पायथन पर्यावरण कैसे स्थापित करें
<पी> अपना सर्वर शुरू करने के लिए, हमें अपना पायथन वातावरण स्थापित करना होगा। वीएस कोड के भीतर प्रोजेक्ट फ़ोल्डर खोलें, और टर्मिनल खोलें। <पी> प्रोजेक्ट रूट से, सीडी को सर्वर डायरेक्टरी में डालें और python3.8 -m venv env चलाएँ . इससे एकआभासी वातावरणबनेगा हमारे पायथन प्रोजेक्ट के लिए, जिसका नाम env होगा . आभासी वातावरण को सक्रिय करने के लिए, source env/bin/activate चलाएँ पी> <पी> इसके बाद, अपने पायथन परिवेश में कुछ लाइब्रेरी स्थापित करें। pip install fastapi uuid uvicorn gunicorn WebSockets python-dotenv aioredis
<पी> इसके बाद touch .env चलाकर एक पर्यावरण फ़ाइल बनाएं टर्मिनल में. हम अपने ऐप वेरिएबल्स और गुप्त वेरिएबल्स को .env के भीतर परिभाषित करेंगे फ़ाइल. पी> <पी> अपना ऐप पर्यावरण चर जोड़ें और इसे "विकास" पर इस प्रकार सेट करें:export APP_ENV=development . इसके बाद, हम एक फास्टएपीआई सर्वर के साथ एक डेवलपमेंट सर्वर स्थापित करेंगे। FastAPI सर्वर सेटअप
<पी> सर्वर निर्देशिका के मूल में, main.py नामक एक नई फ़ाइल बनाएं फिर डेवलपमेंट सर्वर के लिए नीचे दिए गए कोड को पेस्ट करें: from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv
load_dotenv()
api = FastAPI()
@api.get("/test")
async def root():
return {"msg": "API is Online"}
if __name__ == "__main__":
if os.environ.get('APP_ENV') == "development":
uvicorn.run("main:api", host="0.0.0.0", port=3500,
workers=4, reload=True)
else:
pass
<पी> सबसे पहले हम import FastAPI और इसे api के रूप में प्रारंभ करें . फिर हम import load_dotenv python-dotenv से लाइब्रेरी, और .env से वेरिएबल्स को लोड करने के लिए इसे आरंभीकृत करें फ़ाइल, <पी> फिर हम एपीआई का परीक्षण करने के लिए एक सरल परीक्षण मार्ग बनाते हैं। परीक्षण मार्ग एक सरल JSON प्रतिक्रिया लौटाएगा जो हमें बताएगा कि एपीआई ऑनलाइन है। पी> <पी> अंत में, हमने uvicorn.run का उपयोग करके विकास सर्वर स्थापित किया और आवश्यक तर्क प्रदान करना। एपीआई पोर्ट 3500 पर चलेगा . <पी> अंत में, टर्मिनल में सर्वर को python main.py के साथ चलाएँ . एक बार आप Application startup complete देख लें टर्मिनल में, अपने ब्राउज़र पर URL http://localhost:3500/test पर जाएँ, और आपको इस तरह का एक वेब पेज मिलना चाहिए: <पी>
एपीआई परीक्षण पृष्ठ पी> एपीआई में रूट कैसे जोड़ें
<पी> इस अनुभाग में, हम अपने एपीआई में रूट जोड़ेंगे। src नाम से एक नया फ़ोल्डर बनाएं . यह वह निर्देशिका है जहां हमारे सभी एपीआई कोड रहेंगे। पी> <पी> routes नाम का एक सबफ़ोल्डर बनाएं , फ़ोल्डर में सीडी, chat.py नामक एक नई फ़ाइल बनाएं और फिर नीचे कोड जोड़ें: import os
from fastapi import APIRouter, FastAPI, WebSocket, Request
chat = APIRouter()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(request: Request):
return None
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None
# @route Websocket /chat
# @desc Socket for chatbot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket = WebSocket):
return None
<पी> हमने तीन समापन बिंदु बनाए: /token चैट सत्र तक पहुंच के लिए उपयोगकर्ता को एक सत्र टोकन जारी करेगा। चूंकि चैट ऐप सार्वजनिक रूप से खुला रहेगा, हम प्रमाणीकरण के बारे में चिंता नहीं करना चाहते हैं और इसे सरल रखना चाहते हैं - लेकिन हमें अभी भी प्रत्येक अद्वितीय उपयोगकर्ता सत्र की पहचान करने का एक तरीका चाहिए।
/refresh_token कनेक्शन खो जाने पर उपयोगकर्ता के लिए सत्र इतिहास प्राप्त होगा, जब तक कि टोकन अभी भी सक्रिय है और समाप्त नहीं हुआ है।
/chat क्लाइंट और सर्वर के बीच संदेश भेजने के लिए एक वेबसॉकेट खोलेगा।
<पी> इसके बाद, चैट रूट को हमारे मुख्य एपीआई से कनेक्ट करें। सबसे पहले हमें import chat from src.chat करना होगा हमारे main.py के भीतर फ़ाइल. फिर हम वस्तुतः include_router पर कॉल करके राउटर को शामिल करेंगे प्रारंभिक FastAPI पर विधि क्लास और पासिंग चैट को तर्क के रूप में। पी> <पी> अपना api.py अपडेट करें कोड जैसा कि नीचे दिखाया गया है: from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv
from routes.chat import chat
load_dotenv()
api = FastAPI()
api.include_router(chat)
@api.get("/test")
async def root():
return {"msg": "API is Online"}
if __name__ == "__main__":
if os.environ.get('APP_ENV') == "development":
uvicorn.run("main:api", host="0.0.0.0", port=3500,
workers=4, reload=True)
else:
pass
UUID के साथ चैट सेशन टोकन कैसे जेनरेट करें
<पी> उपयोगकर्ता टोकन उत्पन्न करने के लिए हम uuid4 का उपयोग करेंगे हमारे चैट समापन बिंदु के लिए गतिशील मार्ग बनाने के लिए। चूँकि यह एक सार्वजनिक रूप से उपलब्ध समापन बिंदु है, इसलिए हमें JWTs और प्रमाणीकरण के बारे में विवरण में जाने की आवश्यकता नहीं होगी। पी> <पी> यदि आपने uuid इंस्टॉल नहीं किया है प्रारंभ में, pip install uuid चलाएँ . इसके बाद Chat.py में, UUID आयात करें और /token को अपडेट करें नीचे दिए गए कोड के साथ मार्ग:
from fastapi import APIRouter, FastAPI, WebSocket, Request, BackgroundTasks, HTTPException
import uuid
# @route POST /token
# @desc Route generating chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
token = str(uuid.uuid4())
data = {"name": name, "token": token}
return data
<पी> उपरोक्त कोड में, ग्राहक अपना नाम प्रदान करता है, जो आवश्यक है। हम यह सुनिश्चित करने के लिए त्वरित जांच करते हैं कि नाम फ़ील्ड खाली नहीं है, फिर uuid4 का उपयोग करके एक टोकन उत्पन्न करते हैं। पी> <पी> सत्र डेटा नाम और टोकन के लिए एक सरल शब्दकोश है। अंततः हमें इस सत्र डेटा को जारी रखना होगा और एक टाइमआउट सेट करना होगा, लेकिन अभी हम इसे क्लाइंट को लौटा देंगे। पोस्टमैन के साथ एपीआई का परीक्षण कैसे करें
<पी> क्योंकि हम एक वेबसॉकेट एंडपॉइंट का परीक्षण करेंगे, हमें पोस्टमैन जैसे टूल का उपयोग करने की आवश्यकता है जो इसकी अनुमति देता है (क्योंकि फास्टएपीआई पर डिफ़ॉल्ट स्वैगर डॉक्स वेबसॉकेट का समर्थन नहीं करता है)। पी> <पी> पोस्टमैन में, अपने विकास परिवेश के लिए एक संग्रह बनाएं और localhost:3500/token पर एक POST अनुरोध भेजें नाम को क्वेरी पैरामीटर के रूप में निर्दिष्ट करना और उसे एक मान पास करना। आपको नीचे दिखाए अनुसार प्रतिक्रिया मिलनी चाहिए: <पी>
टोकन जेनरेटर पोस्टमैन पी> वेबसॉकेट और कनेक्शन प्रबंधक
<पी> Src रूट में, socket नामक एक नया फ़ोल्डर बनाएं और connection.py नाम की एक फ़ाइल जोड़ें . इस फ़ाइल में, हम उस वर्ग को परिभाषित करेंगे जो हमारे वेबसॉकेट के कनेक्शन को नियंत्रित करता है, और कनेक्ट करने और डिस्कनेक्ट करने के लिए सभी सहायक तरीकों को नियंत्रित करता है। पी> <पी> connection.py में नीचे कोड जोड़ें:
from fastapi import WebSocket
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
<पी> ConnectionManager क्लास को active_connections से आरंभ किया गया है विशेषता जो सक्रिय कनेक्शनों की एक सूची है। पी> <पी> फिर एसिंक्रोनस connect विधि WebSocket स्वीकार करेगी और इसे सक्रिय कनेक्शन की सूची में जोड़ें, जबकि disconnect विधि Websocket को हटा देगी सक्रिय कनेक्शनों की सूची से. पी> <पी> अंत में, send_personal_message विधि एक संदेश और Websocket लेगी हम संदेश भेजना चाहते हैं और संदेश को एसिंक्रोनस रूप से भेजना चाहते हैं। <पी> वेबसॉकेट एक बहुत व्यापक विषय है और हमने यहां केवल सतह को खंगाला है। हालाँकि, यह एकाधिक कनेक्शन बनाने और उन कनेक्शनों के संदेशों को अतुल्यकालिक रूप से संभालने के लिए पर्याप्त होना चाहिए। पी> <पी> आप फास्टएपीआई वेबसॉकेट और सॉकेट प्रोग्रामिंग के बारे में अधिक पढ़ सकते हैं।पी> <पी> ConnectionManager का उपयोग करने के लिए , आयात करें और इसे src.routes.chat.py के भीतर प्रारंभ करें , और /chat को अपडेट करें नीचे दिए गए कोड के साथ वेबसॉकेट मार्ग: from ..socket.connection import ConnectionManager
manager = ConnectionManager()
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
<पी> websocket_endpoint में फ़ंक्शन, जो एक वेबसॉकेट लेता है, हम कनेक्शन मैनेजर में नया वेबसॉकेट जोड़ते हैं और while True चलाते हैं लूप, यह सुनिश्चित करने के लिए कि सॉकेट खुला रहे। सिवाय इसके कि जब सॉकेट डिस्कनेक्ट हो जाए। पी> <पी> जब कनेक्शन खुला होता है, तो हमें क्लाइंट द्वारा websocket.receive_test() के साथ भेजा गया कोई भी संदेश प्राप्त होता है और उन्हें अभी के लिए टर्मिनल पर प्रिंट करें। पी> <पी> फिर हम अभी के लिए क्लाइंट को एक हार्ड-कोडित प्रतिक्रिया भेजते हैं। अंततः ग्राहकों से प्राप्त संदेश एआई मॉडल को भेजा जाएगा, और ग्राहक को वापस भेजी गई प्रतिक्रिया एआई मॉडल की प्रतिक्रिया होगी। <पी> पोस्टमैन में, हम एक नया वेबसॉकेट अनुरोध बनाकर और वेबसॉकेट एंडपॉइंट localhost:3500/chat से कनेक्ट करके इस एंडपॉइंट का परीक्षण कर सकते हैं। . पी> <पी> जब आप कनेक्ट पर क्लिक करते हैं, तो संदेश फलक दिखाएगा कि एपीआई क्लाइंट यूआरएल से जुड़ा है, और एक सॉकेट खुला है। पी> <पी> इसका परीक्षण करने के लिए, चैट सर्वर पर एक संदेश "हैलो बॉट" भेजें, और आपको तत्काल परीक्षण प्रतिक्रिया "प्रतिक्रिया:जीपीटी सेवा से अनुकरण प्रतिक्रिया" प्राप्त होनी चाहिए जैसा कि नीचे दिखाया गया है: <पी>
पोस्टमैन चैट टेस्ट पी> FastAPI में निर्भरता इंजेक्शन
<पी> दो अलग-अलग क्लाइंट सत्रों के बीच अंतर करने और चैट सत्रों को सीमित करने में सक्षम होने के लिए, हम एक समयबद्ध टोकन का उपयोग करेंगे, जिसे वेबसॉकेट कनेक्शन के लिए एक क्वेरी पैरामीटर के रूप में पारित किया जाएगा। पी> <पी> सॉकेट फ़ोल्डर में, utils.py नाम की एक फ़ाइल बनाएं फिर नीचे कोड जोड़ें: from fastapi import WebSocket, status, Query
from typing import Optional
async def get_token(
websocket: WebSocket,
token: Optional[str] = Query(None),
):
if token is None or token == "":
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return token
<पी> Get_token फ़ंक्शन एक WebSocket और टोकन प्राप्त करता है, फिर जाँचता है कि टोकन कोई नहीं है या शून्य है। पी> <पी> यदि यह मामला है, तो फ़ंक्शन नीति उल्लंघन स्थिति लौटाता है और यदि उपलब्ध है, तो फ़ंक्शन केवल टोकन लौटाता है। हम अंततः बाद में अतिरिक्त टोकन सत्यापन के साथ इस फ़ंक्शन का विस्तार करेंगे। <पी> इस फ़ंक्शन का उपभोग करने के लिए, हम इसे /chat में इंजेक्ट करते हैं मार्ग. फास्टएपीआई निर्भरता को आसानी से इंजेक्ट करने के लिए डिपेंड्स क्लास प्रदान करता है, इसलिए हमें डेकोरेटर्स के साथ छेड़छाड़ नहीं करनी पड़ती है। पी> <पी> /chat को अपडेट करें निम्नलिखित के लिए मार्ग: from ..socket.utils import get_token
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
<पी> अब जब आप /chat से कनेक्ट करने का प्रयास करेंगे पोस्टमैन में एंडपॉइंट, आपको 403 त्रुटि मिलेगी। क्वेरी पैरामीटर के रूप में एक टोकन प्रदान करें और अभी के लिए टोकन का कोई भी मूल्य प्रदान करें। फिर आपको पहले की तरह कनेक्ट करने में सक्षम होना चाहिए, केवल अब कनेक्शन के लिए टोकन की आवश्यकता है। <पी>
टोकन के साथ डाकिया चैट परीक्षण पी> <पी> यहां तक पहुंचने के लिए बधाई! आपका chat.py फ़ाइल अब इस तरह दिखनी चाहिए: import os
from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect, Request, Depends, HTTPException
import uuid
from ..socket.connection import ConnectionManager
from ..socket.utils import get_token
chat = APIRouter()
manager = ConnectionManager()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
data = {"name": name, "token": token}
return data
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None
# @route Websocket /chat
# @desc Socket for chatbot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
<पी> इस ट्यूटोरियल के अगले भाग में, हम अपने एप्लिकेशन की स्थिति को संभालने और क्लाइंट और सर्वर के बीच डेटा पास करने पर ध्यान केंद्रित करेंगे। रेडिस के साथ रीयल-टाइम सिस्टम कैसे बनाएं
<पी> हमारा एप्लिकेशन वर्तमान में किसी भी स्थिति को संग्रहीत नहीं करता है, और उपयोगकर्ताओं की पहचान करने या चैट डेटा को संग्रहीत करने और पुनर्प्राप्त करने का कोई तरीका नहीं है। हम चैट सत्र के दौरान क्लाइंट को हार्ड-कोडित प्रतिक्रिया भी लौटा रहे हैं। पी> <पी> ट्यूटोरियल के इस भाग में, हम निम्नलिखित को कवर करेंगे: - रेडिस क्लस्टर से कैसे जुड़ें पायथन में और एक रेडिस क्लाइंट सेट करें
- Redis JSON के साथ डेटा कैसे संग्रहीत और पुनर्प्राप्त करें
- कैसे सेट अप करें रेडिस स्ट्रीम एक वेब सर्वर और कार्यकर्ता वातावरण के बीच संदेश कतार के रूप में
रेडिस और वितरित मैसेजिंग कतारें
<पी> रेडिस एक ओपन सोर्स इन-मेमोरी डेटा स्टोर है जिसे आप डेटाबेस, कैश, मैसेज ब्रोकर और स्ट्रीमिंग इंजन के रूप में उपयोग कर सकते हैं। यह कई डेटा संरचनाओं का समर्थन करता है और वास्तविक समय क्षमताओं के साथ वितरित अनुप्रयोगों के लिए एक आदर्श समाधान है। पी> <पी> रेडिस एंटरप्राइज क्लाउड रेडिस द्वारा प्रदान की गई एक पूरी तरह से प्रबंधित क्लाउड सेवा है जो बुनियादी ढांचे के बारे में चिंता किए बिना रेडिस क्लस्टर को अनंत पैमाने पर तैनात करने में हमारी मदद करती है। पी> <पी> हम इस ट्यूटोरियल के लिए निःशुल्क रेडिस एंटरप्राइज़ क्लाउड इंस्टेंस का उपयोग करेंगे। आप यहां रेडिस क्लाउड के साथ निःशुल्क शुरुआत कर सकते हैं और रेडिस डेटाबेस और रेडिस इनसाइट, रेडिस के साथ इंटरैक्ट करने के लिए एक जीयूआई स्थापित करने के लिए इस ट्यूटोरियल का अनुसरण कर सकते हैं। <पी> एक बार जब आप अपना रेडिस डेटाबेस सेट कर लें, तो प्रोजेक्ट रूट में (सर्वर फ़ोल्डर के बाहर) worker नामक एक नया फ़ोल्डर बनाएं। . पी> <पी> हम अपने कार्यकर्ता वातावरण को वेब सर्वर से अलग कर देंगे ताकि जब ग्राहक हमारे वेबसॉकेट को एक संदेश भेजता है, तो वेब सर्वर को तीसरे पक्ष की सेवा के अनुरोध को संभालना न पड़े। साथ ही, अन्य उपयोगकर्ताओं के लिए संसाधन मुक्त किये जा सकते हैं। पी> <पी> अनुमान एपीआई के साथ पृष्ठभूमि संचार को रेडिस के माध्यम से इस कार्यकर्ता सेवा द्वारा नियंत्रित किया जाता है। <पी> सभी जुड़े हुए ग्राहकों के अनुरोधों को संदेश कतार (निर्माता) में जोड़ दिया जाता है, जबकि कार्यकर्ता संदेशों का उपभोग करता है, अनुरोधों को अनुमान एपीआई में भेजता है, और प्रतिक्रिया को प्रतिक्रिया कतार में जोड़ता है। पी> <पी> एक बार जब एपीआई को प्रतिक्रिया मिल जाती है, तो वह उसे क्लाइंट को वापस भेज देती है। पी> <पी> निर्माता और उपभोक्ता के बीच यात्रा के दौरान, ग्राहक कई संदेश भेज सकता है, और इन संदेशों को कतारबद्ध किया जाएगा और क्रम में जवाब दिया जाएगा। पी> <पी> आदर्श रूप से, हम इस कार्यकर्ता को अपने स्वयं के वातावरण में, पूरी तरह से अलग सर्वर पर चला सकते हैं, लेकिन अभी के लिए, हम अपनी स्थानीय मशीन पर इसका अपना पायथन वातावरण बनाएंगे। <पी> आप सोच रहे होंगे –हमें कार्यकर्ता की आवश्यकता क्यों है? ऐसे परिदृश्य की कल्पना करें जहां वेब सर्वर तृतीय-पक्ष सेवा के लिए अनुरोध भी बनाता है। इसका मतलब यह है कि सॉकेट कनेक्शन के दौरान तीसरे पक्ष की सेवा से प्रतिक्रिया की प्रतीक्षा करते समय, सर्वर अवरुद्ध हो जाता है और एपीआई से प्रतिक्रिया प्राप्त होने तक संसाधन बंधे रहते हैं। पी> <पी> आप एक यादृच्छिक स्लीप time.sleep(10) बनाकर इसे आज़मा सकते हैं हार्ड-कोडित प्रतिक्रिया भेजने और एक नया संदेश भेजने से पहले। फिर एक नए पोस्टमैन सत्र में एक अलग टोकन से जुड़ने का प्रयास करें। पी> <पी> आप देखेंगे कि चैट सत्र तब तक कनेक्ट नहीं होगा जब तक यादृच्छिक नींद का समय समाप्त नहीं हो जाता। <पी> हालाँकि हम अधिक उत्पादन-केंद्रित सर्वर सेट-अप में अतुल्यकालिक तकनीकों और वर्कर पूल का उपयोग कर सकते हैं, लेकिन एक साथ उपयोगकर्ताओं की संख्या बढ़ने के कारण यह भी पर्याप्त नहीं होगा। पी> <पी> अंततः, हम अपने चैट एपीआई और तृतीय-पक्ष एपीआई के बीच संचार को बढ़ावा देने के लिए रेडिस का उपयोग करके वेब सर्वर संसाधनों को बांधने से बचना चाहते हैं। <पी> इसके बाद एक नया टर्मिनल खोलें, वर्कर फ़ोल्डर में सीडी डालें और एक नया पायथन वर्चुअल वातावरण बनाएं और सक्रिय करें जैसा कि हमने भाग 1 में किया था। <पी> इसके बाद, निम्नलिखित निर्भरताएँ स्थापित करें: pip install aiohttp aioredis python-dotenv
रेडिस क्लाइंट के साथ पायथन में रेडिस क्लस्टर से कैसे जुड़ें
<पी> हम Redis डेटाबेस से जुड़ने के लिए aioredis क्लाइंट का उपयोग करेंगे। हम हगिंगफेस अनुमान एपीआई को अनुरोध भेजने के लिए अनुरोध लाइब्रेरी का भी उपयोग करेंगे। पी> <पी> दो फ़ाइलें .env बनाएं , और main.py . फिर src नाम का एक फोल्डर बनाएं . साथ ही, redis नाम का एक फोल्डर बनाएं और config.py नामक एक नई फ़ाइल जोड़ें . पी> <पी> .env में फ़ाइल, निम्नलिखित कोड जोड़ें - और सुनिश्चित करें कि आपने फ़ील्ड को अपने रेडिस क्लस्टर में दिए गए क्रेडेंशियल्स के साथ अपडेट किया है। export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
<पी> config.py में नीचे Redis क्लास जोड़ें: import os
from dotenv import load_dotenv
import aioredis
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
<पी> हम एक रेडिस ऑब्जेक्ट बनाते हैं और पर्यावरण चर से आवश्यक पैरामीटर प्रारंभ करते हैं। फिर हम एक एसिंक्रोनस विधि create_connection बनाते हैं रेडिस कनेक्शन बनाने और aioredis से प्राप्त कनेक्शन पूल को वापस करने के लिए विधि from_url . <पी> इसके बाद, हम नीचे दिए गए कोड को चलाकर main.py में Redis कनेक्शन का परीक्षण करते हैं। यह एक नया रेडिस कनेक्शन पूल बनाएगा, एक साधारण कुंजी "कुंजी" सेट करेगा, और इसे एक स्ट्रिंग "मान" निर्दिष्ट करेगा।
from src.redis.config import Redis
import asyncio
async def main():
redis = Redis()
redis = await redis.create_connection()
print(redis)
await redis.set("key", "value")
if __name__ == "__main__":
asyncio.run(main())
<पी> अब Redis Insight खोलें (यदि आपने इसे डाउनलोड और इंस्टॉल करने के लिए ट्यूटोरियल का अनुसरण किया है) तो आपको कुछ इस तरह देखना चाहिए: <पी>
रेडिस इनसाइट टेस्ट पी> रेडिस स्ट्रीम के साथ कैसे काम करें
<पी> अब जबकि हमारे पास अपना कार्यकर्ता परिवेश सेटअप है, हम वेब सर्वर पर एक निर्माता और कार्यकर्ता पर एक उपभोक्ता बना सकते हैं। पी> <पी> सबसे पहले, आइए सर्वर पर फिर से अपनी रेडिस क्लास बनाएं। server.src में redis नाम का एक फ़ोल्डर बनाएं और दो फ़ाइलें जोड़ें, config.py और producer.py . पी> <पी> config.py में , नीचे दिए गए कोड को जोड़ें जैसा कि हमने कार्यकर्ता परिवेश के लिए किया था: import os
from dotenv import load_dotenv
import aioredis
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
<पी> .env फ़ाइल में, Redis क्रेडेंशियल भी जोड़ें: export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
<पी> अंत में, server.src.redis.producer.py में निम्नलिखित कोड जोड़ें:
from .config import Redis
class Producer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def add_to_stream(self, data: dict, stream_channel):
try:
msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
print(f"Message id {msg_id} added to {stream_channel} stream")
return msg_id
except Exception as e:
print(f"Error sending msg to stream => {e}")
<पी> हमने एक निर्माता वर्ग बनाया है जिसे रेडिस क्लाइंट के साथ प्रारंभ किया गया है। हम इस क्लाइंट का उपयोग add_to_stream के साथ स्ट्रीम में डेटा जोड़ने के लिए करते हैं विधि, जो डेटा और रेडिस चैनल का नाम लेती है। पी> <पी> स्ट्रीम चैनल में डेटा जोड़ने के लिए रेडिस कमांड xadd है और इसमें एयोरेडिस में उच्च-स्तरीय और निम्न-स्तरीय दोनों कार्य हैं। <पी> इसके बाद, हमारे नव निर्मित प्रोड्यूसर को चलाने के लिए, chat.py को अपडेट करें और वेबसॉकेट /chat समापन बिंदु नीचे की तरह। अद्यतन चैनल नाम message_channel पर ध्यान दें .
from ..redis.producer import Producer
from ..redis.config import Redis
chat = APIRouter()
manager = ConnectionManager()
redis = Redis()
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
try:
while True:
data = await websocket.receive_text()
print(data)
stream_data = {}
stream_data[token] = data
await producer.add_to_stream(stream_data, "message_channel")
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
<पी> इसके बाद, पोस्टमैन में, एक कनेक्शन बनाएं और Hello लिखे हुए कितने भी संदेश भेजें . आपके स्ट्रीम संदेश टर्मिनल पर नीचे की तरह मुद्रित होने चाहिए: <पी>
टर्मिनल चैनल संदेश परीक्षण पी> <पी> रेडिस इनसाइट में, आपको एक नया mesage_channel दिखाई देगा बनाई गई और क्लाइंट से भेजे गए संदेशों से भरी एक टाइम-स्टैम्प्ड कतार। यह टाइमस्टैम्प्ड कतार संदेशों के क्रम को संरक्षित करने के लिए महत्वपूर्ण है। <पी>
रेडिस इनसाइट चैनल पी> चैट डेटा को मॉडल कैसे करें
<पी> इसके बाद, हम अपने चैट संदेशों के लिए एक मॉडल बनाएंगे। याद रखें कि हम वेबसॉकेट पर टेक्स्ट डेटा भेज रहे हैं, लेकिन हमारे चैट डेटा में केवल टेक्स्ट से अधिक जानकारी होनी चाहिए। हमें चैट भेजे जाने पर टाइमस्टैंप लगाने, प्रत्येक संदेश के लिए एक आईडी बनाने और चैट सत्र के बारे में डेटा एकत्र करने की आवश्यकता है, फिर इस डेटा को JSON प्रारूप में संग्रहीत करना होगा। पी> <पी> हम इस JSON डेटा को Redis में संग्रहीत कर सकते हैं ताकि कनेक्शन खो जाने पर हम चैट इतिहास न खोएं, क्योंकि हमारा WebSocket स्थिति संग्रहीत नहीं करता है। <पी> server.src में schema नामक एक नया फ़ोल्डर बनाएं . फिर chat.py नाम की एक फ़ाइल बनाएं server.src.schema में निम्नलिखित कोड जोड़ें: from datetime import datetime
from pydantic import BaseModel
from typing import List, Optional
import uuid
class Message(BaseModel):
id = uuid.uuid4()
msg: str
timestamp = str(datetime.now())
class Chat(BaseModel):
token: str
messages: List[Message]
name: str
session_start = str(datetime.now())
<पी> हम पाइडेंटिक के BaseModel का उपयोग कर रहे हैं चैट डेटा को मॉडल करने के लिए क्लास। Chat कक्षा एकल चैट सत्र के बारे में डेटा रखेगी। यह datetime.now() का उपयोग करके चैट सत्र प्रारंभ समय के लिए टोकन, उपयोगकर्ता का नाम और स्वचालित रूप से जेनरेट किया गया टाइमस्टैम्प संग्रहीत करेगा। . पी> <पी> इस चैट सत्र में भेजे और प्राप्त किए गए संदेशों को Message के साथ संग्रहीत किया जाता है क्लास जो uuid4 का उपयोग करके तुरंत एक चैट आईडी बनाती है . इस Message को आरंभ करते समय हमें केवल एक ही डेटा प्रदान करना होगा वर्ग संदेश पाठ है। रेडिस JSON के साथ कैसे काम करें
<पी> अपने चैट इतिहास को संग्रहीत करने के लिए Redis JSON की क्षमता का उपयोग करने के लिए, हमें Redis लैब्स द्वारा प्रदान किए गए rejson को इंस्टॉल करना होगा। पी> <पी> टर्मिनल में, server पर सीडी डालें और pip install rejson के साथ rejson इंस्टॉल करें . फिर अपना Redis अपडेट करें server.src.redis.config.py में कक्षा create_rejson_connection को शामिल करने के लिए विधि:
import os
from dotenv import load_dotenv
import aioredis
from rejson import Client
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
self.REDIS_HOST = os.environ['REDIS_HOST']
self.REDIS_PORT = os.environ['REDIS_PORT']
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
def create_rejson_connection(self):
self.redisJson = Client(host=self.REDIS_HOST,
port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)
return self.redisJson
<पी> हम create_rejson_connection जोड़ रहे हैं रेज़सन Client के साथ रेडिस से जुड़ने की विधि . यह हमें Redis में JSON डेटा बनाने और हेरफेर करने की विधियाँ देता है, जो aioredis के पास उपलब्ध नहीं हैं। <पी> अगला, server.src.routes.chat.py में हम /token को अपडेट कर सकते हैं एक नया Chat बनाने के लिए समापन बिंदु उदाहरण और सत्र डेटा को Redis JSON में इस प्रकार संग्रहीत करें: @chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
# Create new chat session
json_client = redis.create_rejson_connection()
chat_session = Chat(
token=token,
messages=[],
name=name
)
# Store chat session in redis JSON with the token as key
json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
# Set a timeout for redis data
redis_client = await redis.create_connection()
await redis_client.expire(str(token), 3600)
return chat_session.dict()
<पी> नोट:क्योंकि यह एक डेमो ऐप है, मैं चैट डेटा को रेडिस में बहुत लंबे समय तक संग्रहीत नहीं करना चाहता। इसलिए मैंने एओरेडिस क्लाइंट का उपयोग करके टोकन पर 60 मिनट का टाइम आउट जोड़ा है (रेजसन टाइमआउट लागू नहीं करता है)। इसका मतलब है कि 60 मिनट के बाद चैट सेशन का डेटा खत्म हो जाएगा। पी> <पी> यह आवश्यक है क्योंकि हम उपयोगकर्ताओं को प्रमाणित नहीं कर रहे हैं, और हम एक निर्धारित अवधि के बाद चैट डेटा को डंप करना चाहते हैं। यह चरण वैकल्पिक है, और आपको इसे शामिल करने की आवश्यकता नहीं है। <पी> इसके बाद, पोस्टमैन में, जब आप एक नया टोकन बनाने के लिए POST अनुरोध भेजते हैं, तो आपको नीचे दी गई तरह एक संरचित प्रतिक्रिया मिलेगी। आप JSON कुंजी के रूप में टोकन के साथ संग्रहीत अपने चैट डेटा और मूल्य के रूप में डेटा को देखने के लिए रेडिस इनसाइट की भी जांच कर सकते हैं। <पी>
टोकन जेनरेटर अपडेट किया गया पी> टोकन निर्भरता को कैसे अपडेट करें
<पी> अब जब हमारे पास एक टोकन तैयार और संग्रहीत किया जा रहा है, तो यह get_token को अपडेट करने का एक अच्छा समय है। हमारे /chat में निर्भरता वेबसॉकेट। हम चैट सत्र शुरू करने से पहले वैध टोकन की जांच करने के लिए ऐसा करते हैं। पी> <पी> server.src.socket.utils.py में get_token को अद्यतन करें यह जाँचने के लिए फ़ंक्शन कि टोकन Redis उदाहरण में मौजूद है या नहीं। यदि ऐसा होता है तो हम टोकन लौटा देते हैं, जिसका अर्थ है कि सॉकेट कनेक्शन वैध है। यदि यह मौजूद नहीं है, तो हम कनेक्शन बंद कर देते हैं। पी> <पी> /token द्वारा बनाया गया टोकन 60 मिनट के बाद अस्तित्व समाप्त हो जाएगा। इसलिए यदि चैट शुरू करने का प्रयास करते समय कोई त्रुटि प्रतिक्रिया उत्पन्न होती है, तो उपयोगकर्ता को नया टोकन उत्पन्न करने के लिए पुनर्निर्देशित करने के लिए हमारे पास फ्रंटएंड पर कुछ सरल तर्क हो सकते हैं।
from ..redis.config import Redis
async def get_token(
websocket: WebSocket,
token: Optional[str] = Query(None),
):
if token is None or token == "":
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
redis_client = await redis.create_connection()
isexists = await redis_client.exists(token)
if isexists == 1:
return token
else:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Session not authenticated or expired token")
<पी> निर्भरता का परीक्षण करने के लिए, हमारे द्वारा उपयोग किए जा रहे यादृच्छिक टोकन के साथ चैट सत्र से जुड़ें, और आपको 403 त्रुटि मिलनी चाहिए। (ध्यान दें कि आपको Redis Insight में टोकन को मैन्युअल रूप से हटाना होगा।) <पी> अब जब आपने पोस्ट अनुरोध को /token पर भेजा था तो उत्पन्न टोकन को कॉपी करें एंडपॉइंट (या एक नया अनुरोध बनाएं) और इसे /chat के लिए आवश्यक टोकन क्वेरी पैरामीटर के मान के रूप में पेस्ट करें वेबसॉकेट। फिर कनेक्ट करें. आपको एक सफल कनेक्शन मिलना चाहिए. <पी>
टोकन के साथ चैट सत्र पी> <पी> यह सब एक साथ लाने पर, आपकी चैट.पी नीचे की तरह दिखनी चाहिए।
import os
from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect, Request, Depends
import uuid
from ..socket.connection import ConnectionManager
from ..socket.utils import get_token
import time
from ..redis.producer import Producer
from ..redis.config import Redis
from ..schema.chat import Chat
from rejson import Path
chat = APIRouter()
manager = ConnectionManager()
redis = Redis()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
# Create nee chat session
json_client = redis.create_rejson_connection()
chat_session = Chat(
token=token,
messages=[],
name=name
)
print(chat_session.dict())
# Store chat session in redis JSON with the token as key
json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
# Set a timeout for redis data
redis_client = await redis.create_connection()
await redis_client.expire(str(token), 3600)
return chat_session.dict()
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None
# @route Websocket /chat
# @desc Socket for chat bot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
json_client = redis.create_rejson_connection()
try:
while True:
data = await websocket.receive_text()
stream_data = {}
stream_data[token] = data
await producer.add_to_stream(stream_data, "message_channel")
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
<पी> यहाँ तक पहुँचने पर शाबाश! अगले भाग में, हम एआई मॉडल के साथ संचार करने और क्लाइंट, सर्वर, वर्कर और बाहरी एपीआई के बीच डेटा ट्रांसफर को संभालने पर ध्यान केंद्रित करेंगे। एआई मॉडल के साथ चैटबॉट्स में इंटेलिजेंस कैसे जोड़ें
<पी> इस अनुभाग में, हम ट्रांसफॉर्मर मॉडल के साथ संचार करने के लिए एक रैपर बनाने, एक उपयोगकर्ता से एपीआई को संवादी प्रारूप में संकेत भेजने और हमारे चैट एप्लिकेशन के लिए प्रतिक्रियाएं प्राप्त करने और बदलने पर ध्यान केंद्रित करेंगे। हगिंगफेस के साथ शुरुआत कैसे करें
<पी> हम हगिनफेस पर कोई भाषा मॉडल नहीं बनाएंगे या तैनात नहीं करेंगे। इसके बजाय, हम पूर्व-प्रशिक्षित मॉडल से जुड़ने के लिए हगिंगफेस के त्वरित अनुमान एपीआई का उपयोग करने पर ध्यान केंद्रित करेंगे। पी> <पी> हम जिस मॉडल का उपयोग करेंगे वह EleutherAI द्वारा प्रदान किया गया GPT-J-6B मॉडल है। यह एक जेनरेटिव भाषा मॉडल है जिसे 6 बिलियन पैरामीटर्स के साथ प्रशिक्षित किया गया था। पी> <पी> हगिंगफेस हमें इस मॉडल से जुड़ने के लिए ऑन-डिमांड सीमित एपीआई बिल्कुल मुफ्त प्रदान करता है। <पी> हगिंगफेस के साथ शुरुआत करने के लिए, एक निःशुल्क खाता बनाएं। अपनी सेटिंग्स में, एक नया एक्सेस टोकन जनरेट करें। 30 हजार टोकन तक के लिए, हगिंगफेस मुफ्त में अनुमान एपीआई तक पहुंच प्रदान करता है। पी> <पी> आप यहां अपने एपीआई उपयोग की निगरानी कर सकते हैं। सुनिश्चित करें कि आप इस टोकन को सुरक्षित रखें और इसे सार्वजनिक रूप से उजागर न करें। <पी> नोट:हम एपीआई के साथ संचार करने के लिए HTTP कनेक्शन का उपयोग करेंगे क्योंकि हम एक निःशुल्क खाते का उपयोग कर रहे हैं। लेकिन PRO Huggingface खाता WebSockets के साथ समानता और बैच नौकरियों को देखने के लिए स्ट्रीमिंग का समर्थन करता है। पी> <पी> यह मॉडल और हमारे चैट एप्लिकेशन के बीच प्रतिक्रिया समय को बेहतर बनाने में काफी मदद कर सकता है, और मुझे उम्मीद है कि मैं इस पद्धति को एक अनुवर्ती लेख में शामिल करूंगा। भाषा मॉडल के साथ कैसे इंटरैक्ट करें
<पी> सबसे पहले, हम अपनी वर्कर निर्देशिका के भीतर .env फ़ाइल में हगिंगफेस कनेक्शन क्रेडेंशियल जोड़ते हैं। export HUGGINFACE_INFERENCE_TOKEN=<HUGGINGFACE ACCESS TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B
<पी> अगला, worker.src में model नाम का एक फ़ोल्डर बनाएं फिर एक फ़ाइल gptj.py जोड़ें . फिर नीचे GPT क्लास जोड़ें: import os
from dotenv import load_dotenv
import requests
import json
load_dotenv()
class GPT:
def __init__(self):
self.url = os.environ.get('MODEL_URL')
self.headers = {
"Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
self.payload = {
"inputs": "",
"parameters": {
"return_full_text": False,
"use_cache": True,
"max_new_tokens": 25
}
}
def query(self, input: str) -> list:
self.payload["inputs"] = input
data = json.dumps(self.payload)
response = requests.request(
"POST", self.url, headers=self.headers, data=data)
print(json.loads(response.content.decode("utf-8")))
return json.loads(response.content.decode("utf-8"))
if __name__ == "__main__":
GPT().query("Will artificial intelligence help humanity conquer the universe?")
<पी> GPT क्लास को हगिंगफेस मॉडल url के साथ आरंभ किया गया है , प्रमाणीकरण header , और पूर्वनिर्धारित payload . लेकिन पेलोड इनपुट एक गतिशील फ़ील्ड है जो query द्वारा प्रदान किया जाता है हगिंगफेस एंडपॉइंट पर अनुरोध भेजने से पहले विधि और अपडेट किया गया। <पी> अंत में, हम सीधे GPT क्लास के उदाहरण पर क्वेरी विधि चलाकर इसका परीक्षण करते हैं। टर्मिनल में, python src/model/gptj.py चलाएँ , और आपको इस तरह की प्रतिक्रिया मिलनी चाहिए (बस ध्यान रखें कि आपकी प्रतिक्रिया निश्चित रूप से इससे अलग होगी): [{'generated_text': ' (AI) could solve all the problems on this planet? I am of the opinion that in the short term artificial intelligence is much better than human beings, but in the long and distant future human beings will surpass artificial intelligence.\n\nIn the distant'}]
<पी> इसके बाद, हम इनपुट के प्रारूप को बदलकर मॉडल के साथ इंटरैक्शन को अधिक संवादी बनाने के लिए इनपुट में कुछ बदलाव जोड़ते हैं। पी> <पी> GPT को अपडेट करें कक्षा इस प्रकार:
class GPT:
def __init__(self):
self.url = os.environ.get('MODEL_URL')
self.headers = {
"Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
self.payload = {
"inputs": "",
"parameters": {
"return_full_text": False,
"use_cache": False,
"max_new_tokens": 25
}
}
def query(self, input: str) -> list:
self.payload["inputs"] = f"Human: {input} Bot:"
data = json.dumps(self.payload)
response = requests.request(
"POST", self.url, headers=self.headers, data=data)
data = json.loads(response.content.decode("utf-8"))
text = data[0]['generated_text']
res = str(text.split("Human:")[0]).strip("\n").strip()
return res
if __name__ == "__main__":
GPT().query("Will artificial intelligence help humanity conquer the universe?")
<पी> हमने इनपुट को एक स्ट्रिंग शाब्दिक f"Human: {input} Bot:" के साथ अद्यतन किया . मानव इनपुट को स्ट्रिंग में रखा गया है और बॉट एक प्रतिक्रिया प्रदान करता है। यह इनपुट प्रारूप GPT-J6B को एक संवादात्मक मॉडल में बदल देता है। अन्य परिवर्तन जो आप देख सकते हैं उनमें शामिल हैं - use_cache:यदि आप चाहते हैं कि इनपुट समान होने पर मॉडल एक नई प्रतिक्रिया बनाए, तो आप इसे गलत बना सकते हैं। मेरा सुझाव है कि यदि कोई उपयोगकर्ता एक ही संदेश के साथ बॉट को स्पैम करता रहता है तो आपके मुफ़्त टोकन को ख़त्म होने से बचाने के लिए उत्पादन में इसे सत्य के रूप में छोड़ दें। कैश का उपयोग करने से वास्तव में मॉडल से कोई नई प्रतिक्रिया लोड नहीं होती है।
- return_full_text:गलत है, क्योंकि हमें इनपुट वापस करने की आवश्यकता नहीं है - यह हमारे पास पहले से ही है। जब हमें कोई प्रतिक्रिया मिलती है, तो हम प्रतिक्रिया से "बॉट:" और अग्रणी/पिछली जगहों को हटा देते हैं और केवल प्रतिक्रिया पाठ लौटाते हैं।
एआई मॉडल के लिए अल्पकालिक मेमोरी का अनुकरण कैसे करें
<पी> हमारे द्वारा मॉडल को भेजे जाने वाले प्रत्येक नए इनपुट के लिए, मॉडल के पास वार्तालाप इतिहास को याद रखने का कोई तरीका नहीं है। यदि हम बातचीत में संदर्भ बनाए रखना चाहते हैं तो यह महत्वपूर्ण है। पी> <पी> लेकिन याद रखें कि जैसे-जैसे हम मॉडल को भेजे जाने वाले टोकन की संख्या बढ़ाते हैं, प्रसंस्करण अधिक महंगा हो जाता है, और प्रतिक्रिया समय भी लंबा हो जाता है। पी> <पी> इसलिए हमें अल्पकालिक इतिहास को पुनः प्राप्त करने और इसे मॉडल पर भेजने का एक तरीका खोजने की आवश्यकता होगी। हमें एक अच्छी बात का भी पता लगाना होगा - हम कितना ऐतिहासिक डेटा पुनः प्राप्त करना और मॉडल को भेजना चाहते हैं? <पी> चैट इतिहास को संभालने के लिए, हमें अपने JSON डेटाबेस पर वापस जाना होगा। हम token का उपयोग करेंगे अंतिम चैट डेटा प्राप्त करने के लिए, और फिर जब हमें प्रतिक्रिया मिलती है, तो प्रतिक्रिया को JSON डेटाबेस में जोड़ें। <पी> worker.src.redis.config.py को अपडेट करें create_rejson_connection को शामिल करने के लिए विधि. साथ ही, .env फ़ाइल को प्रमाणीकरण डेटा के साथ अद्यतन करें, और सुनिश्चित करें कि rejson स्थापित है। <पी> आपका worker.src.redis.config.py इस तरह दिखना चाहिए:
import os
from dotenv import load_dotenv
import aioredis
from rejson import Client
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
self.REDIS_HOST = os.environ['REDIS_HOST']
self.REDIS_PORT = os.environ['REDIS_PORT']
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
def create_rejson_connection(self):
self.redisJson = Client(host=self.REDIS_HOST,
port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)
return self.redisJson
<पी> जबकि आपकी .env फ़ाइल इस तरह दिखनी चाहिए: export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
export HUGGINFACE_INFERENCE_TOKEN=<HUGGINGFACE ACCESS TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B
<पी> अगला, worker.src.redis में cache.py नामक एक नई फ़ाइल बनाएं और नीचे कोड जोड़ें: from .config import Redis
from rejson import Path
class Cache:
def __init__(self, json_client):
self.json_client = json_client
async def get_chat_history(self, token: str):
data = self.json_client.jsonget(
str(token), Path.rootPath())
return data
<पी> कैश को रेज़सन क्लाइंट और विधि get_chat_history के साथ आरंभ किया गया है Redis से उस टोकन का चैट इतिहास प्राप्त करने के लिए एक टोकन लेता है। सुनिश्चित करें कि आप पथ ऑब्जेक्ट को rejson से आयात करते हैं। <पी> इसके बाद, worker.main.py को अपडेट करें नीचे दिए गए कोड के साथ: from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
redis = Redis()
async def main():
json_client = redis.create_rejson_connection()
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
if __name__ == "__main__":
asyncio.run(main())
<पी> मैंने पोस्टमैन में पिछले परीक्षणों से बनाए गए एक नमूना टोकन को हार्ड-कोड किया है। यदि आपने कोई टोकन नहीं बनाया है, तो बस /token पर एक नया अनुरोध भेजें और टोकन कॉपी करें, फिर python main.py चलाएँ टर्मिनल में. आपको टर्मिनल में डेटा इस प्रकार देखना चाहिए: {'token': '18196e23-763b-4808-ae84-064348a0daff', 'messages': [], 'name': 'Stephen', 'session_start': '2022-07-16 13:20:01.092109'}
<पी> इसके बाद, हमें एक add_message_to_cache जोड़ना होगा हमारे Cache के लिए विधि क्लास जो एक विशिष्ट टोकन के लिए रेडिस में संदेश जोड़ता है।
async def add_message_to_cache(self, token: str, message_data: dict):
self.json_client.jsonarrappend(
str(token), Path('.messages'), message_data)
<पी> jsonarrappend रेज़सन द्वारा प्रदान की गई विधि नए संदेश को संदेश सरणी में जोड़ती है। पी> <पी> ध्यान दें कि संदेश सरणी तक पहुंचने के लिए, हमें .messages प्रदान करना होगा पथ के तर्क के रूप में। यदि आपके संदेश डेटा में एक अलग/नेस्टेड संरचना है, तो बस उस सरणी का पथ प्रदान करें जिसमें आप नया डेटा जोड़ना चाहते हैं। <पी> इस विधि का परीक्षण करने के लिए, नीचे दिए गए कोड के साथ main.py फ़ाइल में मुख्य फ़ंक्शन को अपडेट करें: async def main():
json_client = redis.create_rejson_connection()
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", message_data={
"id": "1",
"msg": "Hello",
"timestamp": "2022-07-16 13:20:01.092109"
})
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
<पी> हम कैश में एक हार्ड-कोडित संदेश भेज रहे हैं, और कैश से चैट इतिहास प्राप्त कर रहे हैं। जब आप python main.py चलाते हैं कार्यकर्ता निर्देशिका के भीतर टर्मिनल में, आपको संदेश सरणी में जोड़े गए संदेश के साथ, टर्मिनल में कुछ इस तरह मुद्रित होना चाहिए। {'token': '18196e23-763b-4808-ae84-064348a0daff', 'messages': [{'id': '1', 'msg': 'Hello', 'timestamp': '2022-07-16 13:20:01.092109'}], 'name': 'Stephen', 'session_start': '2022-07-16 13:20:01.092109'}
<पी> अंत में, हमें संदेश डेटा को GPT मॉडल पर भेजने के लिए मुख्य फ़ंक्शन को अपडेट करना होगा, और इनपुट को अंतिम 4 के साथ अपडेट करना होगा। क्लाइंट और मॉडल के बीच भेजे गए संदेश। पी> <पी> आइए सबसे पहले अपना add_message_to_cache अपडेट करें एक नए तर्क "स्रोत" के साथ कार्य करें जो हमें बताएगा कि संदेश मानव है या बॉट। फिर हम कैश में संग्रहीत करने से पहले डेटा में "मानव:" या "बॉट:" टैग जोड़ने के लिए इस तर्क का उपयोग कर सकते हैं। <पी> Update the add_message_to_cache method in the Cache class like so: async def add_message_to_cache(self, token: str, source: str, message_data: dict):
if source == "human":
message_data['msg'] = "Human: " + (message_data['msg'])
elif source == "bot":
message_data['msg'] = "Bot: " + (message_data['msg'])
self.json_client.jsonarrappend(
str(token), Path('.messages'), message_data)
<पी> Then update the main function in main.py in the worker directory, and run python main.py to see the new results in the Redis database. async def main():
json_client = redis.create_rejson_connection()
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="human", message_data={
"id": "1",
"msg": "Hello",
"timestamp": "2022-07-16 13:20:01.092109"
})
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
<पी> Next, we need to update the main function to add new messages to the cache, read the previous 4 messages from the cache, and then make an API call to the model using the query method. It'll have a payload consisting of a composite string of the last 4 messages. <पी> You can always tune the number of messages in the history you want to extract, but I think 4 messages is a pretty good number for a demo. <पी> In worker.src , create a new folder schema. Then create a new file named chat.py and paste our message schema in chat.py like so: from datetime import datetime
from pydantic import BaseModel
from typing import List, Optional
import uuid
class Message(BaseModel):
id = str(uuid.uuid4())
msg: str
timestamp = str(datetime.now())
<पी> Next, update the main.py file like below: async def main():
json_client = redis.create_rejson_connection()
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="human", message_data={
"id": "3",
"msg": "I would like to go to the moon to, would you take me?",
"timestamp": "2022-07-16 13:20:01.092109"
})
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
message_data = data['messages'][-4:]
input = ["" + i['msg'] for i in message_data]
input = " ".join(input)
res = GPT().query(input=input)
msg = Message(
msg=res
)
print(msg)
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="bot", message_data=msg.dict())
<पी> In the code above, we add new message data to the cache. This message will ultimately come from the message queue. Next we get the chat history from the cache, which will now include the most recent data we added. पी> <पी> Note that we are using the same hard-coded token to add to the cache and get from the cache, temporarily just to test this out. पी> <पी> Next, we trim off the cache data and extract only the last 4 items. Then we consolidate the input data by extracting the msg in a list and join it to an empty string. पी> <पी> Finally, we create a new Message instance for the bot response and add the response to the cache specifying the source as "bot" <पी> Next, run python main.py a couple of times, changing the human message and id as desired with each run. You should have a full conversation input and output with the model. पी> <पी> Open Redis Insight and you should have something similar to the below: <पी>
Conversational Chat पी> Stream Consumer and Real-time Data Pull from the Message Queue
<पी> Next, we want to create a consumer and update our worker.main.py to connect to the message queue. We want it to pull the token data in real-time, as we are currently hard-coding the tokens and message inputs. <पी> In worker.src.redis create a new file named stream.py . Add a StreamConsumer class with the code below: class StreamConsumer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def consume_stream(self, count: int, block: int, stream_channel):
response = await self.redis_client.xread(
streams={stream_channel: '0-0'}, count=count, block=block)
return response
async def delete_message(self, stream_channel, message_id):
await self.redis_client.xdel(stream_channel, message_id)
<पी> The StreamConsumer class is initialized with a Redis client. The consume_stream method pulls a new message from the queue from the message channel, using the xread method provided by aioredis. <पी> Next, update the worker.main.py file with a while loop to keep the connection to the message channel alive, like so:
from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.config import Redis
from src.redis.stream import StreamConsumer
import os
from src.schema.chat import Message
redis = Redis()
async def main():
json_client = redis.create_rejson_connection()
redis_client = await redis.create_connection()
consumer = StreamConsumer(redis_client)
cache = Cache(json_client)
print("Stream consumer started")
print("Stream waiting for new messages")
while True:
response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
if response:
for stream, messages in response:
# Get message from stream, and extract token, message data and message id
for message in messages:
message_id = message[0]
token = [k.decode('utf-8')
for k, v in message[1].items()][0]
message = [v.decode('utf-8')
for k, v in message[1].items()][0]
print(token)
# Create a new message instance and add to cache, specifying the source as human
msg = Message(msg=message)
await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())
# Get chat history from cache
data = await cache.get_chat_history(token=token)
# Clean message input and send to query
message_data = data['messages'][-4:]
input = ["" + i['msg'] for i in message_data]
input = " ".join(input)
res = GPT().query(input=input)
msg = Message(
msg=res
)
print(msg)
await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())
# Delete messaage from queue after it has been processed
await consumer.delete_message(stream_channel="message_channel", message_id=message_id)
if __name__ == "__main__":
asyncio.run(main())
<पी> This is quite the update, so let's take it step by step: <पी> We use a while True loop so that the worker can be online listening to messages from the queue. पी> <पी> Next, we await new messages from the message_channel by calling our consume_stream method. If we have a message in the queue, we extract the message_id, token, and message. Then we create a new instance of the Message class, add the message to the cache, and then get the last 4 messages. We set it as input to the GPT model query method. पी> <पी> Once we get a response, we then add the response to the cache using the add_message_to_cache method, then delete the message from the queue. How to Update the Chat Client with the AI Response
<पी> So far, we are sending a chat message from the client to the message_channel (which is received by the worker that queries the AI model) to get a response. पी> <पी> Next, we need to send this response to the client. As long as the socket connection is still open, the client should be able to receive the response. पी> <पी> If the connection is closed, the client can always get a response from the chat history using the refresh_token समापन बिंदु. <पी> In worker.src.redis create a new file named producer.py , and add a Producer class similar to what we had on the chat web server:
class Producer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def add_to_stream(self, data: dict, stream_channel) -> bool:
msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
print(f"Message id {msg_id} added to {stream_channel} stream")
return msg_id
<पी> Next, in the main.py file, update the main function to initialize the producer, create a stream data, and send the response to a response_channel using the add_to_stream method: from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.config import Redis
from src.redis.stream import StreamConsumer
import os
from src.schema.chat import Message
from src.redis.producer import Producer
redis = Redis()
async def main():
json_client = redis.create_rejson_connection()
redis_client = await redis.create_connection()
consumer = StreamConsumer(redis_client)
cache = Cache(json_client)
producer = Producer(redis_client)
print("Stream consumer started")
print("Stream waiting for new messages")
while True:
response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
if response:
for stream, messages in response:
# Get message from stream, and extract token, message data and message id
for message in messages:
message_id = message[0]
token = [k.decode('utf-8')
for k, v in message[1].items()][0]
message = [v.decode('utf-8')
for k, v in message[1].items()][0]
# Create a new message instance and add to cache, specifying the source as human
msg = Message(msg=message)
await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())
# Get chat history from cache
data = await cache.get_chat_history(token=token)
# Clean message input and send to query
message_data = data['messages'][-4:]
input = ["" + i['msg'] for i in message_data]
input = " ".join(input)
res = GPT().query(input=input)
msg = Message(
msg=res
)
stream_data = {}
stream_data[str(token)] = str(msg.dict())
await producer.add_to_stream(stream_data, "response_channel")
await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())
# Delete messaage from queue after it has been processed
await consumer.delete_message(stream_channel="message_channel", message_id=message_id)
if __name__ == "__main__":
asyncio.run(main())
<पी> Next, we need to let the client know when we receive responses from the worker in the /chat socket endpoint. We do this by listening to the response stream. We do not need to include a while loop here as the socket will be listening as long as the connection is open. <पी> Note that we also need to check which client the response is for by adding logic to check if the token connected is equal to the token in the response. Then we delete the message in the response queue once it's been read. <पी> In server.src.redis create a new file named stream.py and add our StreamConsumer class like this: from .config import Redis
class StreamConsumer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def consume_stream(self, count: int, block: int, stream_channel):
response = await self.redis_client.xread(
streams={stream_channel: '0-0'}, count=count, block=block)
return response
async def delete_message(self, stream_channel, message_id):
await self.redis_client.xdel(stream_channel, message_id)
<पी> Next, update the /chat socket endpoint like so: from ..redis.stream import StreamConsumer
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
json_client = redis.create_rejson_connection()
consumer = StreamConsumer(redis_client)
try:
while True:
data = await websocket.receive_text()
stream_data = {}
stream_data[str(token)] = str(data)
await producer.add_to_stream(stream_data, "message_channel")
response = await consumer.consume_stream(stream_channel="response_channel", block=0)
print(response)
for stream, messages in response:
for message in messages:
response_token = [k.decode('utf-8')
for k, v in message[1].items()][0]
if token == response_token:
response_message = [v.decode('utf-8')
for k, v in message[1].items()][0]
print(message[0].decode('utf-8'))
print(token)
print(response_token)
await manager.send_personal_message(response_message, websocket)
await consumer.delete_message(stream_channel="response_channel", message_id=message[0].decode('utf-8'))
except WebSocketDisconnect:
manager.disconnect(websocket)
Refresh Token
<पी> Finally, we need to update the /refresh_token endpoint to get the chat history from the Redis database using our Cache class. पी> <पी> In server.src.redis , add a cache.py file and add the code below:
from rejson import Path
class Cache:
def __init__(self, json_client):
self.json_client = json_client
async def get_chat_history(self, token: str):
data = self.json_client.jsonget(
str(token), Path.rootPath())
return data
<पी> Next, in server.src.routes.chat.py import the Cache class and update the /token endpoint to the below:
from ..redis.cache import Cache
@chat.get("/refresh_token")
async def refresh_token(request: Request, token: str):
json_client = redis.create_rejson_connection()
cache = Cache(json_client)
data = await cache.get_chat_history(token)
if data == None:
raise HTTPException(
status_code=400, detail="Session expired or does not exist")
else:
return data
<पी> Now, when we send a GET request to the /refresh_token endpoint with any token, the endpoint will fetch the data from the Redis database. पी> <पी> If the token has not timed out, the data will be sent to the user. Or it'll send a 400 response if the token is not found. How to Test the Chat with multiple Clients in Postman
<पी> Finally, we will test the chat system by creating multiple chat sessions in Postman, connecting multiple clients in Postman, and chatting with the bot on the clients. पी> <पी> Lastly, we will try to get the chat history for the clients and hopefully get a proper response. <पी>
Recap
<पी> Let's have a quick recap as to what we have achieved with our chat system. The chat client creates a token for each chat session with a client. This token is used to identify each client, and each message sent by clients connected to or web server is queued in a Redis channel (message_chanel), identified by the token. <पी> Our worker environment reads from this channel. It does not have any clue who the client is (except that it's a unique token) and uses the message in the queue to send requests to the Huggingface inference API. <पी> When it gets a response, the response is added to a response channel and the chat history is updated. The client listening to the response_channel immediately sends the response to the client once it receives a response with its token. <पी> If the socket is still open, this response is sent. If the socket is closed, we are certain that the response is preserved because the response is added to the chat history. The client can get the history, even if a page refresh happens or in the event of a lost connection. <पी> Congratulations on getting this far! You have been able to build a working chat system. पी> <पी> In follow-up articles, I will focus on building a chat user interface for the client, creating unit and functional tests, fine-tuning our worker environment for faster response time with WebSockets and asynchronous requests, and ultimately deploying the chat application on AWS. <पी> This Article is part of a series on building full-stack intelligent chatbots with tools like Python, React, Huggingface, Redis, and so on. You can follow the full series on my blog:blog.stephensanwo.dev - AI ChatBot Series** <पी> You can download the full repository on My Github Repository पी> <पी> I wrote this tutorial in collaboration with Redis. Need help getting started with Redis? Try the following resources: - Try Redis Cloud free of charge
- Watch this video on the benefits of Redis Cloud over other Redis providers
- Redis Developer Hub - tools, guides, and tutorials about Redis
- RedisInsight Desktop GUI
<पी> मुफ़्त में कोड करना सीखें. फ्रीकोडकैंप के ओपन सोर्स पाठ्यक्रम ने 40,000 से अधिक लोगों को डेवलपर्स के रूप में नौकरी पाने में मदद की है। आरंभ करें