आज की पोस्ट में, हम मनोरंजन के लिए एक भोले-भाले बैकग्राउंड प्रोसेसिंग सिस्टम को लागू करने जा रहे हैं! साइडकीक जैसे लोकप्रिय बैकग्राउंड प्रोसेसिंग सिस्टम के आंतरिक भाग में झांकने के दौरान हम कुछ चीजें सीख सकते हैं। इस मज़ा का उत्पाद किसी भी तरह से उत्पादन के उपयोग के लिए अभिप्रेत नहीं है।
आइए कल्पना करें कि हमारे आवेदन में एक कार्य है जो एक या अधिक वेबसाइटों को लोड करता है और उनके शीर्षक निकालता है। चूंकि इन वेबसाइटों के प्रदर्शन पर हमारा कोई प्रभाव नहीं है, इसलिए हम अपने मुख्य थ्रेड (या वर्तमान अनुरोध—अगर हम एक वेब एप्लिकेशन बना रहे हैं) के बाहर कार्य करना चाहते हैं, लेकिन पृष्ठभूमि में।पी>
कार्य को इनकैप्सुलेट करना
पृष्ठभूमि प्रसंस्करण में आने से पहले, आइए कार्य को करने के लिए एक सेवा वस्तु का निर्माण करें। हम शीर्षक टैग की सामग्री को निकालने के लिए OpenURI और Nokogiri का उपयोग करेंगे।
require 'open-uri'
require 'nokogiri'
class TitleExtractorService
def call(url)
document = Nokogiri::HTML(open(url))
title = document.css('html > head > title').first.content
puts title.gsub(/[[:space:]]+/, ' ').strip
rescue
puts "Unable to find a title for #{url}"
end
end
सेवा को कॉल करने से दिए गए URL का शीर्षक प्रिंट हो जाता है।
TitleExtractorService.new.call('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir
यह अपेक्षा के अनुरूप काम करता है, लेकिन देखते हैं कि क्या हम इसे अन्य बैकग्राउंड प्रोसेसिंग सिस्टम की तरह दिखने और महसूस करने के लिए सिंटैक्स में थोड़ा सुधार कर सकते हैं। एक Magique::Worker
. बनाकर मॉड्यूल, हम सेवा वस्तु में कुछ वाक्यात्मक चीनी जोड़ सकते हैं।
module Magique
module Worker
def self.included(base)
base.extend(ClassMethods)
end
module ClassMethods
def perform_now(*args)
new.perform(*args)
end
end
def perform(*)
raise NotImplementedError
end
end
end
मॉड्यूल एक perform
जोड़ता है कार्यकर्ता उदाहरण के लिए विधि और एक perform_now
आह्वान को थोड़ा बेहतर बनाने के लिए कार्यकर्ता वर्ग के लिए विधि।
आइए मॉड्यूल को हमारी सेवा वस्तु में शामिल करें। जब हम इस पर काम कर रहे हों, तो चलिए इसका नाम बदलकर TitleExtractorWorker
कर देते हैं और call
बदलें perform
करने की विधि ।
class TitleExtractorWorker
include Magique::Worker
def perform(url)
document = Nokogiri::HTML(open(url))
title = document.css('html > head > title').first.content
puts title.gsub(/[[:space:]]+/, ' ').strip
rescue
puts "Unable to find a title for #{url}"
end
end
आह्वान का परिणाम अभी भी वही है, लेकिन यह थोड़ा स्पष्ट है कि क्या हो रहा है।
TitleExtractorWorker.perform_now('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir
एसिंक्रोनस प्रोसेसिंग लागू करना
अब जब हमारे पास शीर्षक निष्कर्षण काम कर रहा है, तो हम पिछले रूबी मैजिक लेखों से सभी शीर्षक प्राप्त कर सकते हैं। ऐसा करने के लिए, मान लें कि हमारे पास RUBYMAGIC
है पिछले लेखों के सभी URL की सूची के साथ स्थिर।
RUBYMAGIC.each do |url|
TitleExtractorWorker.perform_now(url)
end
# Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# Bindings and Lexical Scope in Ruby | AppSignal Blog
# Building a Ruby C Extension From Scratch | AppSignal Blog
# Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...
हमें पिछले लेखों के शीर्षक मिलते हैं, लेकिन उन सभी को निकालने में थोड़ा समय लगता है। ऐसा इसलिए है क्योंकि हम अगले अनुरोध पर जाने से पहले प्रत्येक अनुरोध के पूरा होने तक प्रतीक्षा करते हैं।
आइए एक perform_async
. की शुरुआत करके इसे सुधारें हमारे कार्यकर्ता मॉड्यूल के लिए विधि। चीजों को गति देने के लिए, यह प्रत्येक URL के लिए एक नया थ्रेड बनाता है।
module Magique
module Worker
module ClassMethods
def perform_async(*args)
Thread.new { new.perform(*args) }
end
end
end
end
आह्वान को TitleExtractorWorker.perform_async(url)
में बदलने के बाद , हमें लगभग एक ही बार में सभी शीर्षक मिल जाते हैं। हालांकि, इसका मतलब यह भी है कि हम एक बार में रूबी मैजिक ब्लॉग के लिए 20 से अधिक कनेक्शन खोल रहे हैं। (आपके ब्लॉग के साथ खिलवाड़ करने के लिए क्षमा करें, दोस्तों! )
यदि आप लंबे समय से चल रही प्रक्रिया (जैसे वेब सर्वर) के बाहर अपने स्वयं के कार्यान्वयन और इसका परीक्षण कर रहे हैं, तो loop { sleep 1 }
जैसा कुछ जोड़ना न भूलें यह सुनिश्चित करने के लिए कि प्रक्रिया तुरंत समाप्त न हो जाए, आपकी स्क्रिप्ट के अंत तक।
कार्य कतारबद्ध करना
प्रत्येक आमंत्रण के लिए एक नया धागा बनाने के दृष्टिकोण के साथ, हम अंततः संसाधन सीमा (हमारी तरफ और उन वेबसाइटों पर जो हम एक्सेस कर रहे हैं) तक पहुंच जाएंगे। जैसा कि हम अच्छे नागरिक बनना चाहते हैं, आइए कार्यान्वयन को किसी ऐसी चीज़ में बदल दें जो अतुल्यकालिक है लेकिन सेवा से इनकार करने जैसा महसूस नहीं होता है।
इस समस्या को हल करने का एक सामान्य तरीका निर्माता/उपभोक्ता पैटर्न का उपयोग करना है। एक या अधिक निर्माता कार्यों को एक कतार में धकेलते हैं जबकि एक या अधिक उपभोक्ता कतार से कार्य लेते हैं और उन्हें संसाधित करते हैं।
एक कतार मूल रूप से तत्वों की एक सूची है। सिद्धांत रूप में, एक साधारण सरणी काम करेगी। हालाँकि, जैसा कि हम संगामिति के साथ काम कर रहे हैं, हमें यह सुनिश्चित करने की आवश्यकता है कि एक समय में केवल एक निर्माता या उपभोक्ता ही कतार तक पहुँच सकता है। अगर हम इस बारे में सावधान नहीं हैं, तो चीजें अराजकता में समाप्त हो जाएंगी—जैसे दो लोग एक ही बार में एक दरवाजे को निचोड़ने की कोशिश कर रहे हैं।
इस समस्या को उत्पादक-उपभोक्ता समस्या के रूप में जाना जाता है और इसके कई समाधान हैं। सौभाग्य से, यह एक बहुत ही सामान्य समस्या है और रूबी एक उचित Queue
. के साथ जहाज करती है कार्यान्वयन जिसे हम थ्रेड सिंक्रनाइज़ेशन के बारे में चिंता किए बिना उपयोग कर सकते हैं।
इसका उपयोग करने के लिए, सुनिश्चित करें कि निर्माता और उपभोक्ता दोनों कतार तक पहुंच सकते हैं। हम अपने Magique
. में एक क्लास मेथड जोड़कर ऐसा करते हैं मॉड्यूल और Queue
का एक उदाहरण असाइन करना इसके लिए।
module Magique
def self.backend
@backend
end
def self.backend=(backend)
@backend = backend
end
end
Magique.backend = Queue.new
इसके बाद, हम अपना perform_async
. बदलते हैं एक कार्य को अपना नया धागा बनाने के बजाय कतार पर धकेलने के लिए कार्यान्वयन। एक कार्य को हैश के रूप में दर्शाया जाता है जिसमें कार्यकर्ता वर्ग के संदर्भ के साथ-साथ perform_async
को दिए गए तर्क शामिल हैं। विधि।
module Magique
module Worker
module ClassMethods
def perform_async(*args)
Magique.backend.push(worker: self, args: args)
end
end
end
end
इसके साथ, हम चीजों के निर्माता पक्ष के साथ कर रहे हैं। इसके बाद, उपभोक्ता पक्ष पर एक नजर डालते हैं।
प्रत्येक उपभोक्ता एक अलग धागा है जो कतार से कार्य लेता है और उन्हें निष्पादित करता है। एक कार्य के बाद रुकने के बजाय, जैसे कि धागा, उपभोक्ता फिर कतार से दूसरा कार्य लेता है और उसे करता है, और इसी तरह। यहां Magique::Processor
. नामक एक उपभोक्ता का बुनियादी कार्यान्वयन दिया गया है . प्रत्येक प्रोसेसर एक नया थ्रेड बनाता है जो अनंत रूप से लूप करता है। प्रत्येक पुनरावृत्ति के लिए, यह कतार से एक नया कार्य प्राप्त करने का प्रयास करता है, कार्यकर्ता वर्ग का एक नया उदाहरण बनाता है, और इसके perform
को कॉल करता है दिए गए तर्कों के साथ विधि।
module Magique
class Processor
def self.start(concurrency = 1)
concurrency.times { |n| new("Processor #{n}") }
end
def initialize(name)
thread = Thread.new do
loop do
payload = Magique.backend.pop
worker_class = payload[:worker]
worker_class.new.perform(*payload[:args])
end
end
thread.name = name
end
end
end
प्रोसेसिंग लूप के अलावा, हम Magique::Processor.start
नामक एक सुविधा विधि जोड़ते हैं . यह हमें एक साथ कई प्रोसेसर को स्पिन करने की अनुमति देता है। जबकि धागे का नामकरण वास्तव में आवश्यक नहीं है, यह हमें यह देखने की अनुमति देगा कि क्या चीजें वास्तव में अपेक्षित रूप से काम कर रही हैं।
आइए हमारे TitleExtractorWorker
. के आउटपुट को एडजस्ट करें वर्तमान थ्रेड का नाम शामिल करने के लिए।
puts "[#{Thread.current.name}] #{title.gsub(/[[:space:]]+/, ' ').strip}"
हमारे बैकग्राउंड प्रोसेसिंग सेटअप का परीक्षण करने के लिए, हमें अपने कार्यों को कतारबद्ध करने से पहले प्रोसेसर के एक सेट को स्पिन करना होगा।
Magique.backend = Queue.new
Magique::Processor.start(5)
RUBYMAGIC.each do |url|
TitleExtractorWorker.perform_async(url)
end
# [Processor 3] Bindings and Lexical Scope in Ruby | AppSignal Blog
# [Processor 4] Building a Ruby C Extension From Scratch | AppSignal Blog
# [Processor 1] Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# [Processor 0] Ruby's Hidden Gems, StringScanner | AppSignal Blog
# [Processor 2] Fibers and Enumerators in Ruby: Turning Blocks Inside Out | AppSignal Blog
# [Processor 4] Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...
जब इसे चलाया जाता है, तब भी हमें सभी लेखों के शीर्षक मिलते हैं। हालांकि यह हर कार्य के लिए एक अलग थ्रेड का उपयोग करने जितना तेज़ नहीं है, फिर भी यह प्रारंभिक कार्यान्वयन की तुलना में तेज़ है जिसमें कोई पृष्ठभूमि प्रसंस्करण नहीं था। जोड़े गए प्रोसेसर नामों के लिए धन्यवाद, हम यह भी पुष्टि कर सकते हैं कि सभी प्रोसेसर कतार के माध्यम से काम कर रहे हैं। समवर्ती प्रोसेसर की संख्या में बदलाव करके, प्रसंस्करण गति और मौजूदा संसाधन सीमाओं के बीच संतुलन खोजना संभव है।
एकाधिक प्रक्रियाओं और मशीनों तक विस्तार
अब तक, हमारे बैकग्राउंड प्रोसेसिंग सिस्टम का वर्तमान कार्यान्वयन काफी अच्छा काम करता है। हालाँकि, यह अभी भी उसी प्रक्रिया तक सीमित है। संसाधन-भूखे कार्य अभी भी पूरी प्रक्रिया के प्रदर्शन को प्रभावित करेंगे। अंतिम चरण के रूप में, आइए कार्यभार को कई प्रक्रियाओं और यहां तक कि कई मशीनों में वितरित करने पर विचार करें।
कतार उत्पादकों और उपभोक्ताओं के बीच एकमात्र संबंध है। अभी, यह इन-मेमोरी कार्यान्वयन का उपयोग कर रहा है। आइए साइडकीक से अधिक प्रेरणा लें और रेडिस का उपयोग करके एक कतार लागू करें।
रेडिस के पास सूचियों के लिए समर्थन है जो हमें कार्यों को आगे बढ़ाने और लाने की अनुमति देता है। इसके अतिरिक्त, रेडिस रूबी मणि थ्रेड-सुरक्षित है और सूचियों को संशोधित करने के लिए रेडिस आदेश परमाणु हैं। ये गुण सिंक्रोनाइज़ेशन समस्याओं में भागे बिना हमारे एसिंक्रोनस बैकग्राउंड प्रोसेसिंग सिस्टम के लिए इसका उपयोग करना संभव बनाते हैं।
आइए एक रेडिस समर्थित कतार बनाएं जो push
. को लागू करता है और shift
Queue
. जैसी विधियाँ हमने पहले इस्तेमाल किया था।
require 'json'
require 'redis'
module Magique
module Backend
class Redis
def initialize(connection = ::Redis.new)
@connection = connection
end
def push(job)
@connection.lpush('magique:queue', JSON.dump(job))
end
def shift
_queue, job = @connection.brpop('magique:queue')
payload = JSON.parse(job, symbolize_names: true)
payload[:worker] = Object.const_get(payload[:worker])
payload
end
end
end
end
चूंकि रेडिस रूबी वस्तुओं के बारे में कुछ भी नहीं जानता है, हमें अपने कार्यों को डेटाबेस में संग्रहीत करने से पहले उन्हें lpush
का उपयोग करके JSON में क्रमबद्ध करना होगा। आदेश जो सूची के सामने एक तत्व जोड़ता है।
किसी कार्य को कतार से लाने के लिए, हम brpop
. का उपयोग कर रहे हैं कमांड, जो सूची से अंतिम तत्व प्राप्त करता है। यदि सूची खाली है, तो यह तब तक अवरुद्ध रहेगा जब तक कोई नया तत्व उपलब्ध नहीं हो जाता। जब कोई कार्य उपलब्ध न हो तो हमारे प्रोसेसर को रोकने का यह एक अच्छा तरीका है। अंत में, रेडिस से एक कार्य प्राप्त करने के बाद, हमें Object.const_get
का उपयोग करके कार्यकर्ता के नाम के आधार पर वास्तविक रूबी वर्ग को देखना होगा। ।
अंतिम चरण के रूप में, आइए चीजों को कई प्रक्रियाओं में विभाजित करें। चीजों के निर्माता पक्ष पर, हमें केवल इतना करना है कि बैकएंड को हमारी नई कार्यान्वित रेडिस कतार में बदल दें।
# ...
Magique.backend = Magique::Backend::Redis.new
RUBYMAGIC.each do |url|
TitleExtractorWorker.perform_async(url)
end
चीजों के उपभोक्ता पक्ष पर, हम इस तरह की कुछ पंक्तियों से दूर हो सकते हैं:
# ...
Magique.backend = Magique::Backend::Redis.new
Magique::Processor.start(5)
loop { sleep 1 }
निष्पादित होने पर, उपभोक्ता प्रक्रिया कतार में नए काम के आने की प्रतीक्षा करेगी। एक बार जब हम निर्माता प्रक्रिया शुरू करते हैं जो कार्यों को कतार में धकेलती है, तो हम देख सकते हैं कि वे तुरंत संसाधित हो जाते हैं।
जिम्मेदारी से आनंद लें और उत्पादन में इसका इस्तेमाल न करें
जबकि हमने इसे वास्तविक दुनिया के सेटअप से दूर रखा था जिसे आप उत्पादन में उपयोग करेंगे (इसलिए नहीं!), हमने पृष्ठभूमि प्रोसेसर बनाने में कुछ कदम उठाए। हमने एक प्रक्रिया को पृष्ठभूमि सेवा के रूप में चलाकर शुरू किया। फिर हमने इसे एसिंक्स बनाया और Queue
. का इस्तेमाल किया उत्पादक-उपभोक्ता समस्या का समाधान करना। फिर हमने मेमोरी में कार्यान्वयन के बजाय रेडिस का उपयोग करके प्रक्रिया को कई प्रक्रियाओं या मशीनों में विस्तारित किया।
जैसा कि पहले उल्लेख किया गया है, यह एक पृष्ठभूमि प्रसंस्करण प्रणाली का सरलीकृत कार्यान्वयन है। बहुत सी चीजें गायब हैं और स्पष्ट रूप से निपटा नहीं गया है। इनमें त्रुटि प्रबंधन, एकाधिक कतार, शेड्यूलिंग, कनेक्शन पूलिंग और सिग्नल प्रबंधन शामिल हैं (लेकिन इन्हीं तक सीमित नहीं हैं)।
फिर भी, हमें इसे लिखने में मज़ा आया और आशा है कि आपने बैकग्राउंड प्रोसेसिंग सिस्टम के हुड के नीचे एक झलक का आनंद लिया। शायद तुमने एक-दो चीजें भी छीन लीं।