Computer >> कंप्यूटर ट्यूटोरियल >  >> प्रोग्रामिंग >> Redis

रेडिस सूचियों और टाइपस्क्रिप्ट के साथ एक कस्टम संदेश कतार बनाएं

<पी> क्या आपने कभी अपनी स्वयं की संदेश कतार बनाने का प्रयास किया है लेकिन चुनौतियों का सामना करना पड़ा है? यदि हां, तो आप अकेले नहीं हैं। इस ट्यूटोरियल में, हम रेडिस सूचियों का उपयोग करके स्क्रैच से एक संदेश कतार बनाने जा रहे हैं। हालांकि रेडिस के साथ संदेश कतार बनाने के कई तरीके हैं, जैसे स्ट्रीम, सूचियां और पब/सब्स, हम सबसे सरल और सबसे सीधे दृष्टिकोण पर ध्यान केंद्रित करेंगे:सूचियां। जैसे ही हम इस व्यावहारिक मार्गदर्शिका में गहराई से उतरेंगे, मेरे साथ जुड़ें।

हम क्या उपयोग करेंगे

  • अपस्टैश
  • हाथों की जोड़ी

आपको क्या चाहिए

  • बन
  • हाथों की जोड़ी

अपस्टैश रेडिस की स्थापना

<पी> सबसे पहले, आइए एक Redis इंस्टेंस सेट करें। ऐसा करने के लिए, बस अपस्टैश पर जाएं और डेटाबेस बनाएं पर क्लिक करें .उसके बाद, अपनी कनेक्शन स्ट्रिंग ढूंढने के लिए नीचे स्क्रॉल करें, जिसका उपयोग हम अपने क्लाइंट को कनेक्ट करने के लिए करेंगे। मैं यहां विवरण में नहीं जाऊंगा, लेकिन शुरुआत करने के लिए आपको मूल रूप से यही चाहिए।

उदाहरण कनेक्शन स्ट्रिंग:

redis://XXXXe@social-XXX-39281.upstash.io:39281

प्रोजेक्ट किकऑफ़

<पी> आइए बन का उपयोग करके अपना टाइपस्क्रिप्ट प्रोजेक्ट शुरू करें। चुनाव सिर्फ इसलिए नहीं है क्योंकि यह नोड से तेज़ है - इसे स्थापित करना भी बहुत आसान है। और हाँ, यह प्रभावशाली रूप से तेज़ भी है! 🚀

mkdir upstash-mq
cd upstash-mq
 
bun init
> package name (upstash-mq-tutorial): upstash-mq
> entry point (index.ts):
> Done!
 
bun add ioredis

प्रोजेक्ट संरचना

 ┣ 📂src
 ┃ ┣ 📂lua-scripts
 ┃ ┃ ┣ 📜add-job.lua
 ┃ ┃ ┗ 📜remove-job.lua
 ┃ ┣ 📜index.ts
 ┃ ┣ 📜job.ts
 ┃ ┣ 📜queue.ts
 ┃ ┗ 📜utils.ts
 ┣ 📜.env
 ┣ 📜.gitignore
 ┣ 📜README.md
 ┣ 📜bun.lockb
 ┣ 📜index.ts
 ┣ 📜package.json
 ┗ 📜tsconfig.json
<पी> हमारे Queue का दृश्य प्रतिनिधित्व ऐसा होगा:

<पी> रेडिस सूचियों और टाइपस्क्रिप्ट के साथ एक कस्टम संदेश कतार बनाएं

नौकरी

<पी> हमारी जॉब क्लास को कुछ प्रमुख चीज़ों की आवश्यकता है। सबसे पहले, हमें प्रत्येक कार्य की स्थिति को ट्रैक करने की आवश्यकता है। इससे हमें यह तय करने में मदद मिलती है कि क्या उन्हें संसाधित करना है, उन्हें फिर से प्रयास करना है, या यदि वे पहले ही समाप्त हो चुके हैं तो उन्हें कहीं और स्थानांतरित करना है। प्रत्येक कार्य में एक आईडी और कुछ डेटा भी होता है, जिसे सामान्य होना आवश्यक है ताकि हम एक शानदार उपयोगकर्ता अनुभव प्रदान कर सकें। अंत में, हमें प्रत्येक कार्य को उसकी कतार से जोड़ना होगा और आसान प्रबंधन के लिए कतार का नाम शामिल करना होगा।

<पी> यहां हमारे Job की रीढ़ हैं कक्षा:

type OwnerQueue = {
 redis: Redis;
 queueName: string;
};
export type JobStatuses =
 | "created"
 | "waiting"
 | "active"
 | "succeeded"
 | "failed";
 
export class Job<T> {
 id: string;
 status: JobStatuses;
 config: OwnerQueue;
 data: T;
 
 constructor(ownerConfig: OwnerQueue, data: T, jobId = randomUUID()) {
 this.id = jobId;
 this.status = "created";
 this.data = data;
 this.config = ownerConfig;
 }
}
<पी> data बनाने के लिए सामान्य, हमें पहले Job बनाना होगा स्वयं सामान्य. बाकी सब सीधे तरीके से होता है। हम प्रत्येक Job के लिए एक अलग Redis उदाहरण बना सकते हैं तत्व, लेकिन इसे प्रबंधित करना जटिल होगा।

<पी> सौभाग्य से, हमारा दृष्टिकोण कतार के भीतर रेडिस इंस्टेंस के आसान कॉन्फ़िगरेशन की अनुमति देता है, और हम आवश्यकतानुसार इस इंस्टेंस को आसानी से पास कर सकते हैं। यही सिद्धांत queueName पर लागू होता है . चूँकि हम अक्सर नौकरियों को कतारों में सहेजने के लिए इसका उपयोग करेंगे, हमारी नौकरियों को उनकी मूल कतारों के बारे में पता होना चाहिए। नौकरियों को कतारों में सहेजने में सक्षम होने के लिए, हमें दो चीजों की आवश्यकता है:रेडिस और कुछ उपयोगिताओं के साथ बातचीत करने के लिए एक लुआ स्क्रिप्ट।

<पी> आइए पहले अपनी उपयोगिताएँ बनाएँ:

import { JobStatuses } from "./job";
 
const MQ_PREFIX = "UpstashMQ";
 
export const formatMessageQueueKey = (queueName: string, key: string) => {
 return `${MQ_PREFIX}:${queueName}:${key}`;
};
 
export const convertToJSONString = <T>(
 data: T,
 status: JobStatuses,
): string => {
 return JSON.stringify({
 data,
 status,
 });
};
<पी> चूंकि हर बार जब हम रेडिस का उपयोग करते हैं तो मैन्युअल रूप से अपना कतार नाम बनाना आदर्श नहीं है, हमने formatMessageQueueKey नामक एक उपयोगिता बनाई है .यह उपयोगिता बस स्ट्रिंग्स को एक साथ जोड़ती है। इसके अतिरिक्त, हमें Redis में संग्रहीत करने के लिए अपने डेटा को क्रमबद्ध करने की आवश्यकता है - हम केवल JS ऑब्जेक्ट को डेटा स्रोत के रूप में पास नहीं कर सकते हैं; उन्हें पहले स्ट्रिंग्स में परिवर्तित करने की आवश्यकता है। यह देखते हुए कि डेटा सामान्य है, हमने एक सामान्य फ़ंक्शन लागू किया है, convertToJSONString , इस उद्देश्य के लिए.

<पी> अब, आइए अपनी पहली लुआ स्क्रिप्ट जोड़ें:

add-job.lua

--[[
key 1 -> [prefix]:name:jobs
key 2 -> [prefix]:name:waiting
arg 1 -> job id
arg 2 -> job data
]]
 
 
local jobId = ARGV[1]
local payload = ARGV[2]
 
if redis.call("hexists", KEYS[1], jobId) == 1 then return nil end
redis.call("hset", KEYS[1], jobId, payload)
redis.call("lpush", KEYS[2], jobId)
 
return jobId
<पी> हम इन कॉलों को अपने रेडिस इंस्टेंस के साथ व्यक्तिगत रूप से निम्नानुसार निष्पादित कर सकते हैं:

  • redis.hexists(jobId)
  • redis.hset(jobId,payload)
  • redis.lpush(jobId,payload)
<पी> हालाँकि, इस दृष्टिकोण के परिणामस्वरूप तीन अलग-अलग कॉलें होंगी। रेडिस सर्वर पर राउंड ट्रिप को कम करने के लिए, हमारा लक्ष्य पूरी प्रक्रिया को एक ही कॉल में समेकित करना है।

आइए अपना save() जोड़ें विधि

 private createQueueKey(key: string) {
 return formatMessageQueueKey(this.config.queueName, key);
 }
 
 async save(): Promise<string | null> {
 const addJobToQueueScript = await Bun.file("./src/lua-scripts/add-job.lua").text();
 const resJobId = (await this.config.redis.eval(
 addJobToQueueScript,
 2,
 this.createQueueKey("jobs"),
 this.createQueueKey("waiting"),
 this.id,
 convertToJSONString(this.data, this.status)
 )) as string | null;
 
 if (resJobId) {
 this.id = resJobId;
 return resJobId;
 }
 return null;
 }
<पी> कोड सीधा है, लेकिन मुझे और स्पष्ट करने दीजिए। अपनी लुआ स्क्रिप्ट बनाने के बाद, हम इसे redis.eval का उपयोग करके कॉल करते हैं , जो लुआ स्क्रिप्ट निष्पादित करने के लिए आवश्यक है। redis.eval के लिए पैरामीटर इस प्रकार हैं:

  • पहले पैरामीटर के लिए स्क्रिप्ट की आवश्यकता होती है।
  • दूसरा पैरामीटर तर्कों की संख्या निर्दिष्ट करता है।
  • तीसरा और चौथा पैरामीटर कुंजियों के लिए हैं।
  • अंत में, हम वास्तविक तर्क पारित करते हैं।
<पी> इससे पहले कि हम अपना Job ख़त्म करें आइए भविष्य के लिए कुछ और तरीके जोड़ें।

fromId = async <T>(jobId: string): Promise<Job<T> | null> => {
 const jobData = await this.config.redis.hget(this.createQueueKey("jobs"), jobId);
 if (jobData) {
 return this.fromData<T>(jobId, jobData);
 }
 return null;
 };
 
private fromData = <T>(jobId: string, stringifiedJobData: string): Job<T> => {
 const parsedData = JSON.parse(stringifiedJobData) as Job<T>;
 const job = new Job<T>(this.config, parsedData.data, jobId);
 job.status = parsedData.status;
 return job;
};
<पी> अभी, हमें इन कार्यों की आवश्यकता नहीं हो सकती है, लेकिन जब हम भविष्य में नौकरियों का प्रसंस्करण शुरू करेंगे तो ये महत्वपूर्ण हो जाएंगे। उस समय, हमारे पास केवल कार्य की आईडी (jobId) होगी , और हमें अपनी नौकरियों को नए सिरे से बनाने के लिए एक विधि की आवश्यकता होगी। बिल्कुल यही fromId है पूरा करता है. यह रेडिस से जॉब डेटा पुनर्प्राप्त करता है, इसे जॉब इंस्टेंस में परिवर्तित करता है, और इसे वापस कर देता है, ताकि कतार बाद में इस जॉब को संसाधित कर सके।

कतार पर आगे बढ़ें

<पी> save() पूरा करने के बाद भाग, आइए कतार के विवरण में उतरें। यहां हमारे उद्देश्य हैं:

  • हमारा लक्ष्य है कि हमारी कतार सफलता या विफलता पर डेटा को बनाए रखे या हटा दे, क्योंकि ऐसे मामले भी हो सकते हैं जहां बाद में पुन:प्रसंस्करण की आवश्यकता होती है।
  • हमारा इरादा समवर्तीता के लिए कतार को डिज़ाइन करने का है, जिससे कई कार्य एक साथ चल सकें।
  • हमारा लक्ष्य डेटा प्रोसेसिंग के लिए कॉलबैक फ़ंक्शन को पारित करने की अनुमति देना है। बेहतर डेवलपर अनुभव के लिए इस फ़ंक्शन को नौकरी के प्रकार का अनुमान लगाना चाहिए।
  • हम Job.save() पर कॉलिंग सक्षम करने की योजना बना रहे हैं कतार के भीतर से, हमें रेडिस इंस्टेंस और queueName पास करने की अनुमति मिलती है .
  • अंत में, हम यदि आवश्यक हो तो कतार को नष्ट करने और कतार से नौकरी को हटाने की क्षमता सुनिश्चित करना चाहते हैं।

आइए अपनी कतार को परिभाषित करने से शुरुआत करें

export type QueueConfig = {
 redis: Redis;
 queueName: string;
 keepOnSuccess?: boolean;
 keepOnFailure?: boolean;
};
 
export class Queue extends EventEmitter {
 config: QueueConfig;
 concurrency = 0;
 worker: any;
 running = 0;
 queued = 0;
 
 constructor(config: QueueConfig) {
 super();
 this.config = {
 redis: config.redis,
 queueName: config.queueName,
 keepOnFailure: config.keepOnFailure ?? true,
 keepOnSuccess: config.keepOnSuccess ?? true,
 };
 }
 
 createQueueKey(key: string) {
 return formatMessageQueueKey(this.config.queueName, key);
 }
}
<पी> क्यू वर्ग में बाहरी जानकारी को स्वीकार करने के लिए एक कॉन्फिगरेशन शामिल है, जैसे कतार का नाम, रेडिस इंस्टेंस और डेटा को बनाए रखने या हटाने के लिए सेटिंग्स। हम उपयोगकर्ताओं द्वारा पसंद किए जाने वाले किसी भी रेडिस कार्यान्वयन का स्वागत करते हैं, हालांकि हमारे पास अपस्टैश के लिए एक विशेष प्राथमिकता है। यह लचीलापन उपयोगकर्ताओं को कतार को अपने मौजूदा सिस्टम में आसानी से एकीकृत करने की अनुमति देता है।

<पी> और, हमारी कक्षा कुछ घटित होने पर उन्हें सूचित करने के लिए इवेंट एमिटर का विस्तार करती है।

यहां आरंभीकरण का एक उदाहरण दिया गया है:

const queue = new Queue({
 redis: new Redis(process.env.UPSTASH_REDIS_URL),
 queueName: "upstash-rocks",
 keepOnFailure: true,
 keepOnSuccess: true,
});

जोड़ें

async add<T>(payload: T) {
 return new Job<T>(this.config, payload).save();
 }
<पी> हमारा काम मूल कतार के कॉन्फ़िगरेशन विवरण और पेलोड लेता है, जो कि डेटा है जिसे रेडिस में संग्रहीत किया जाएगा। फिर हम इसे आसानी से सहेज लेते हैं।

<पी> अब हम ऐसा कर सकते हैं:

const queue = new Queue({
 redis: new Redis(process.env.UPSTASH_REDIS_URL!),
 queueName: "mytest-queue",
 keepOnFailure: true,
 keepOnSuccess: true,
});
 
const payload = {
 upstash: "best-redis-ever",
};
 
await queue.add(payload);
<पी> अब, हमें उन्हें संसाधित करने का एक तरीका चाहिए।

प्रसंस्करण

<पी> यह हमारी कतार कार्यान्वयन का सबसे चुनौतीपूर्ण हिस्सा है। हमें अपने उपयोगकर्ताओं से समवर्ती प्रक्रियाओं की संख्या निर्दिष्ट करने और एक कार्यकर्ता प्रदान करने की आवश्यकता है - कार्यों को संसाधित करने के लिए एक कॉलबैक फ़ंक्शन जो कार्य के प्रकार का अनुमान लगाएगा। इसके अतिरिक्त, हमें वर्तमान में चल रही और कतारबद्ध नौकरियों की संख्या को ट्रैक करने के लिए एक तंत्र की आवश्यकता है, जिससे हम कतार से अगली नौकरी को सुरक्षित रूप से चुनने में सक्षम हो सकें।

 async process<TJobPayload>(
 worker: (job: TJobPayload) => void,
 concurrency: number
 ): Promise<void> {
 this.concurrency = concurrency;
 this.worker = worker;
 this.running = 0;
 this.queued = 1;
 
 this.jobTick();
 }
<पी> जेनेरिक TJobPayload को स्वीकार करने का प्राथमिक उद्देश्य हमारे उपयोगकर्ताओं के लिए डेवलपर अनुभव को बेहतर बनाना है। हमारा लक्ष्य उन्हें हमारी कतार का उपयोग करते समय इंटेलिजेंस से लाभ उठाने में सक्षम बनाना है। उपयोगकर्ताओं को पता है कि उन्होंने {hello: "world"} जैसे डेटा संग्रहीत किया है नौकरी में, लेकिन सटीक इंटेलिजेंस प्रदान करने के लिए टाइपस्क्रिप्ट को सहायता की आवश्यकता होती है। यही कारण है कि हमारे पास टाइपस्क्रिप्ट को सूचित करने और बेहतर डेवलपर अनुभव के लिए अनुमान लगाने के लिए मजबूर करने के लिए यह तंत्र है।

<पी> jobTick() पर आगे बढ़ने से पहले , आइए इस प्रक्रिया पर ध्यानपूर्वक विचार करें:

  • चूंकि हमारी कतार FIFO (फर्स्ट इन फर्स्ट आउट) के आधार पर संचालित होती है, इसलिए हमें कतार के दाईं ओर से एक कार्य निकालकर शुरुआत करनी होगी।
  • इसके बाद, हम इस कार्य पर अपना कार्यकर्ता कार्य चलाते हैं।
  • कार्य पूरा होने के बाद, हम अपने उपयोगकर्ता को परिणाम भेजते हैं।
  • अंत में, हम jobTick() पर कॉल करते हैं अगले कार्य को संसाधित करने के लिए फिर से।
<पी> इसलिए, jobTick() इसमें ये तीन महत्वपूर्ण भाग शामिल होंगे।"

private jobTick() {
 this.getNextJob()
 .then(async (jobId) => {
 this.running += 1;
 this.queued -= 1;
 if (this.running + this.queued < this.concurrency) {
 this.queued += 1;
 setImmediate(this.jobTick);
 }
 
 if (!jobId) {
 return;
 }
 
 const jobCreatedById = await new Job(this.config, null).fromId(jobId);
 if (jobCreatedById) {
 await this.executeJob(jobCreatedById);
 } else {
 console.error(`Job not found with ID: ${jobId}`);
 }
 })
 .catch((error) => {
 console.error("Error in jobTick:", error);
 })
 .finally(() => {
 setImmediate(() => this.jobTick());
 });
 }
<पी> मैं कार्य दर कार्य समझाने का प्रयास करूँगा। आइए getNextJob() से शुरू करें
 private async getNextJob() {
 try {
 const jobId = await this.config.redis.brpoplpush(
 this.createQueueKey("waiting"),
 this.createQueueKey("active"),
 0
 );
 return jobId;
 } catch (error) {
 console.error("Error fetching the next job:", error);
 throw error;
 }
 }
 
<पी> हम बस रेडिस को कॉल कर रहे हैं, लेकिन एक रणनीतिक दृष्टिकोण के साथ:हम lpush के साथ संयुक्त रूप से एक ब्लॉकिंग कॉल का उपयोग करते हैं। राउंड ट्रिप की संख्या को कम करने के लिए। ब्लॉकिंग कॉल का उपयोग जानबूझकर किया जाता है; हम अन्य श्रमिकों को एक ही कार्य को एक साथ करने से रोकना चाहते हैं, इस प्रकार दौड़ की स्थिति से बचना चाहते हैं। इसके अतिरिक्त, हम नौकरियों को 'प्रतीक्षा' स्थिति से 'सक्रिय' स्थिति में ले जाते हैं, और उन्हें प्रक्रिया के अगले चरणों के लिए प्रभावी ढंग से तैयार करते हैं।

this.getNextJob().then(async (jobId) => {
 this.running += 1;
 this.queued -= 1;
 if (this.running + this.queued < this.concurrency) {
 this.queued += 1;
 setImmediate(this.jobTick);
 }
 
 if (!jobId) {
 return;
 }
 
 const jobCreatedById = await new Job(this.config, null).fromId(jobId);
 if (jobCreatedById) {
 await this.executeJob(jobCreatedById);
 } else {
 console.error(`Job not found with ID: ${jobId}`);
 }
});
<पी> अब हमारे पास jobId है , हम चालू नौकरियों की संख्या एक से बढ़ाते हैं और कतारबद्ध नौकरियों की संख्या एक से कम करते हैं। हम समवर्ती सीमा का सम्मान करते हुए यथासंभव नई नौकरियां शुरू करने का भी प्रयास करते हैं:

if (this.running + this.queued < this.concurrency) {
 this.queued += 1;
 setImmediate(this.jobTick);
}
<पी> यदि सब कुछ ठीक रहा, तो हम अपना Job बनाने के लिए आगे बढ़ेंगे fromId का उपयोग करके .एक बार जब हम अपने कार्य को उसकी आईडी द्वारा सफलतापूर्वक पुनर्निर्मित कर लेते हैं, तो हम अपने वर्कर फ़ंक्शन के साथ कार्य को निष्पादित करने के लिए आगे बढ़ते हैं।

<पी> चलिए executeJob पर चलते हैं
private async executeJob<TJobPayload>(jobCreatedById: Job<TJobPayload>) {
 let hasError = false;
 try {
 await this.worker(jobCreatedById.data);
 this.running -= 1;
 this.queued += 1;
 } catch (error) {
 hasError = true;
 } finally {
 const [jobStatus, job] = await this.finishJob<TJobPayload>(jobCreatedById, hasError);
 this.emit(jobStatus, job.id);
 return;
 }
 }
<पी> अब जब हमारे पास जॉब का डेटा है, तो हम इस डेटा को अपने worker पर भेज देते हैं . यदि यह सफलतापूर्वक निष्पादित होता है, तो हम कतारबद्ध नौकरियों की संख्या में एक की वृद्धि करते हैं और चल रही नौकरियों की संख्या में एक की कमी करते हैं। यह कदम महत्वपूर्ण है; यदि इसे सही ढंग से नहीं संभाला गया, तो यह एक साथ नई नौकरियां लॉन्च करने की हमारी क्षमता को प्रभावित कर सकता है। कार्यकर्ता के निष्पादन के दौरान किसी त्रुटि के मामले में, हम बस hasError को स्विच करते हैं झंडा. अंत में, हम finishJob पर कॉल करते हैं हमारे jobCreatedById के साथ और hasError jobId के साथ स्थिति को ध्वजांकित करें और उत्सर्जित करें .

<पी> साइड नोट:उपयोगकर्ता अब इस तरह से उत्सर्जित अपडेट सुन सकते हैं।

queue.on("succeeded", (jobId) => console.log("Succeeded jobId", jobId));
<पी> चलिए finishJob पर चलते हैं
private async finishJob<TJobPayload>(
 job: Job<TJobPayload>,
 hasFailed?: boolean
 ): Promise<[JobStatuses, Job<TJobPayload>]> {
 const multi = this.config.redis.multi();
 
 multi.lrem(this.createQueueKey("active"), 0, job.id);
 
 if (hasFailed) {
 if (this.config.keepOnFailure) {
 multi.hset(this.createQueueKey("jobs"), job.id, convertToJSONString(job.data, job.status));
 multi.sadd(this.createQueueKey("failed"), job.id);
 } else {
 multi.hdel(this.createQueueKey("jobs"), job.id);
 }
 job.status = "failed";
 } else {
 if (this.config.keepOnSuccess) {
 multi.hset(this.createQueueKey("jobs"), job.id, convertToJSONString(job.data, job.status));
 multi.sadd(this.createQueueKey("succeeded"), job.id);
 } else {
 multi.hdel(this.createQueueKey("jobs"), job.id);
 }
 job.status = "succeeded";
 }
 
 await multi.exec();
 return [job.status, job];
 }
<पी> यहां एक महत्वपूर्ण पहलू multi() का उपयोग है चूंकि हमारा उद्देश्य हमेशा राउंड ट्रिप को कम करना है। multi का उपयोग करके , Redis निष्पादन को तब तक स्थगित कर देता है जब तक हम exec() को कॉल नहीं करते .यदि उपयोगकर्ता ने keepOnFailure सेट किया है और keepOnSuccess डेटा को संरक्षित करने के लिए, हम दो सेट बनाएंगे:एक नौकरी के डेटा के साथ और दूसरा इस डेटा तक पहुंचने के लिए नौकरी आईडी की सूची के साथ। यह दृष्टिकोण सफलता और विफलता दोनों परिदृश्यों पर लागू होता है। स्वाभाविक रूप से, हम कार्य की स्थिति को तदनुसार समायोजित करते हैं। अंत में, हम exec के साथ मल्टी कमांड निष्पादित करते हैं और घटना उत्सर्जन प्रयोजनों के लिए कार्य की स्थिति और स्वयं कार्य लौटाएँ।

<पी> अंत में, हमारे पास दो शेष विधियाँ हैं, जिनके बारे में मैं विस्तार से नहीं बताऊँगा, क्योंकि वे उन अवधारणाओं का उपयोग करते हैं जिनसे हम पहले से ही परिचित हैं।

 async removeJob(jobId: string) {
 const addJobToQueueScript = await Bun.file("./src/lua-scripts/remove-job.lua").text();
 return await this.config.redis.eval(
 addJobToQueueScript,
 5,
 this.createQueueKey("succeeded"),
 this.createQueueKey("failed"),
 this.createQueueKey("waiting"),
 this.createQueueKey("active"),
 this.createQueueKey("jobs"),
 jobId
 );
 }
 
 async destroy() {
 const args = ["id", "jobs", "waiting", "active", "succeeded", "failed"].map((key) =>
 this.createQueueKey(key)
 );
 const res = await this.config.redis.del(...args);
 return res;
 }

remove-job.lua

--[[
key 1 -> [prefix]:test:succeeded
key 2 -> [prefix]:test:failed
key 3 -> [prefix]:test:waiting
key 4 -> [prefix]:test:active
key 5 -> [prefix]:test:jobs
arg 1 -> jobId
]]
 
local jobId = ARGV[1]
 
if (redis.call("sismember", KEYS[1], jobId) + redis.call("sismember", KEYS[2], jobId)) == 0 then
 redis.call("lrem", KEYS[3], 0, jobId)
 redis.call("lrem", KEYS[4], 0, jobId)
end
 
redis.call("srem", KEYS[1], jobId)
redis.call("srem", KEYS[2], jobId)
redis.call("hdel", KEYS[5], jobId)
 
<पी> destroy() पूरी कतार को पूरी तरह से मिटा देना है, और दूसरा कतार से किसी विशिष्ट कार्य को हटाना है।

आइए सब कुछ क्रियान्वित होते हुए देखें

import { sleep } from "bun";
import Redis from "ioredis";
 
import { Queue } from "./queue";
 
type Payload = {
 id: number;
 data: string;
};
 
const queue = new Queue({
 redis: new Redis(process.env.UPSTASH_REDIS_URL),
 queueName: "mytest-queue",
});
 
async function main() {
 await generateQueueItems(queue, 20);
 console.log("Sleep starting for 5 sec");
 await sleep(5000);
 
 queue.on("succeeded", (jobId) => console.log("Succeeded jobId", jobId));
 await queue.process<Payload>((job) => {
 console.log("Processing job:", job.data);
 sleep(1000);
 }, 3);
}
 
main();
 
async function generateQueueItems(queue: Queue, itemCount: number) {
 for (let i = 0; i < itemCount; i++) {
 const payload = {
 id: i,
 data: `dummy-data-${i}`,
 // Add more properties as needed for your testing
 };
 const jobId = await queue.add(payload);
 console.log(`Added item ${i} with jobId: ${jobId}`);
 }
}

बोनस चुनौतियाँ

  • रेडिस एक्सेस और वर्कर प्रक्रियाओं दोनों के लिए घातीय बैकऑफ़ के साथ पुनः प्रयास तर्क लागू करें।
  • 'कम से कम एक बार' गारंटी तंत्र विकसित करें।
  • बेहतर प्रदर्शन के लिए सर्विस वर्कर्स में कर्मचारियों को निष्पादित करने का प्रयास करें
  • निर्धारित नौकरियां जोड़ें

समाप्ति

<पी> किसी चीज़ को सीखने का सबसे अच्छा तरीका उसका निर्माण करना है, और इससे भी बेहतर तरीका अपस्टैश रेडिस का उपयोग करके ऐसा करना है। धमाल मचाते रहो.

<पी> 🔗प्रोजेक्ट का जीथब पता


  1. Matplotlib के साथ Boxplot कैसे बनाएं? Matplotlib के साथ Boxplot कैसे बनाएं?

    Matplotlib के साथ Boxplot बनाने के लिए, हम निम्नलिखित कदम उठा सकते हैं - फिगर साइज सेट करें और सबप्लॉट्स के बीच और आसपास पैडिंग को एडजस्ट करें। xticks . की सूची बनाएं । xticks . के साथ एक बॉक्सप्लॉट प्लॉट करें डेटा। xticks Set सेट करें और xtick लेबल 45° रोटेशन के साथ। आकृति प्रदर्शित

  1. पायथन प्रोग्राम में इनबिल्ट फ़ंक्शंस का उपयोग किए बिना अपर और लोअर केस कैरेक्टर गिनें पायथन प्रोग्राम में इनबिल्ट फ़ंक्शंस का उपयोग किए बिना अपर और लोअर केस कैरेक्टर गिनें

    इस लेख में, हम नीचे दिए गए समस्या कथन के समाधान के बारे में जानेंगे। समस्या कथन - हमें एक स्ट्रिंग दी गई है, हमें इनबिल्ट फ़ंक्शन का उपयोग किए बिना स्ट्रिंग में मौजूद अपरकेस और लोअरकेस वर्णों की संख्या गिनने की आवश्यकता है इसे अजगर में उपलब्ध islower () और isupper () फ़ंक्शन का उपयोग करके आसानी स

  1. मास्टर रिएक्ट नेटिव:2024 के लिए शीर्ष पाठ्यक्रम, पुस्तकें और संसाधन मास्टर रिएक्ट नेटिव:2024 के लिए शीर्ष पाठ्यक्रम, पुस्तकें और संसाधन

    सकारात्मक नौकरी दृष्टिकोण के साथ मोबाइल और सॉफ्टवेयर विकास एक संभावित आकर्षक करियर विकल्प है। यूएस ब्यूरो ऑफ लेबर स्टैटिस्टिक्स के अनुसार, इस क्षेत्र के पेशेवर प्रति वर्ष $109,020 का औसत वेतन कमाते हैं। बीएलएस ने 2021 से 2031 के बीच नौकरियों में 25 प्रतिशत की वृद्धि की भी भविष्यवाणी की है, जो अन्य स