Computer >> कंप्यूटर >  >> प्रोग्रामिंग >> Ruby

रूबी में एक बैकग्राउंड प्रोसेसिंग सिस्टम बिल्डिंग द्वारा सीखना

आज की पोस्ट में, हम मनोरंजन के लिए एक भोले-भाले बैकग्राउंड प्रोसेसिंग सिस्टम को लागू करने जा रहे हैं! साइडकीक जैसे लोकप्रिय बैकग्राउंड प्रोसेसिंग सिस्टम के आंतरिक भाग में झांकने के दौरान हम कुछ चीजें सीख सकते हैं। इस मज़ा का उत्पाद किसी भी तरह से उत्पादन के उपयोग के लिए अभिप्रेत नहीं है।

आइए कल्पना करें कि हमारे आवेदन में एक कार्य है जो एक या अधिक वेबसाइटों को लोड करता है और उनके शीर्षक निकालता है। चूंकि इन वेबसाइटों के प्रदर्शन पर हमारा कोई प्रभाव नहीं है, इसलिए हम अपने मुख्य थ्रेड (या वर्तमान अनुरोध—अगर हम एक वेब एप्लिकेशन बना रहे हैं) के बाहर कार्य करना चाहते हैं, लेकिन पृष्ठभूमि में।

कार्य को इनकैप्सुलेट करना

पृष्ठभूमि प्रसंस्करण में आने से पहले, आइए कार्य को करने के लिए एक सेवा वस्तु का निर्माण करें। हम शीर्षक टैग की सामग्री को निकालने के लिए OpenURI और Nokogiri का उपयोग करेंगे।

require 'open-uri'
require 'nokogiri'
 
class TitleExtractorService
  def call(url)
    document = Nokogiri::HTML(open(url))
    title = document.css('html > head > title').first.content
    puts title.gsub(/[[:space:]]+/, ' ').strip
  rescue
    puts "Unable to find a title for #{url}"
  end
end

सेवा को कॉल करने से दिए गए URL का शीर्षक प्रिंट हो जाता है।

TitleExtractorService.new.call('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir

यह अपेक्षा के अनुरूप काम करता है, लेकिन देखते हैं कि क्या हम इसे अन्य बैकग्राउंड प्रोसेसिंग सिस्टम की तरह दिखने और महसूस करने के लिए सिंटैक्स में थोड़ा सुधार कर सकते हैं। एक Magique::Worker . बनाकर मॉड्यूल, हम सेवा वस्तु में कुछ वाक्यात्मक चीनी जोड़ सकते हैं।

module Magique
  module Worker
    def self.included(base)
      base.extend(ClassMethods)
    end
 
    module ClassMethods
      def perform_now(*args)
        new.perform(*args)
      end
    end
 
    def perform(*)
      raise NotImplementedError
    end
  end
end

मॉड्यूल एक perform जोड़ता है कार्यकर्ता उदाहरण के लिए विधि और एक perform_now आह्वान को थोड़ा बेहतर बनाने के लिए कार्यकर्ता वर्ग के लिए विधि।

आइए मॉड्यूल को हमारी सेवा वस्तु में शामिल करें। जब हम इस पर काम कर रहे हों, तो चलिए इसका नाम बदलकर TitleExtractorWorker कर देते हैं और callबदलें perform करने की विधि ।

class TitleExtractorWorker
  include Magique::Worker
 
  def perform(url)
    document = Nokogiri::HTML(open(url))
    title = document.css('html > head > title').first.content
    puts title.gsub(/[[:space:]]+/, ' ').strip
  rescue
    puts "Unable to find a title for #{url}"
  end
end

आह्वान का परिणाम अभी भी वही है, लेकिन यह थोड़ा स्पष्ट है कि क्या हो रहा है।

TitleExtractorWorker.perform_now('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir

एसिंक्रोनस प्रोसेसिंग लागू करना

अब जब हमारे पास शीर्षक निष्कर्षण काम कर रहा है, तो हम पिछले रूबी मैजिक लेखों से सभी शीर्षक प्राप्त कर सकते हैं। ऐसा करने के लिए, मान लें कि हमारे पास RUBYMAGIC है पिछले लेखों के सभी URL की सूची के साथ स्थिर।

RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_now(url)
end
 
# Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# Bindings and Lexical Scope in Ruby | AppSignal Blog
# Building a Ruby C Extension From Scratch | AppSignal Blog
# Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...

हमें पिछले लेखों के शीर्षक मिलते हैं, लेकिन उन सभी को निकालने में थोड़ा समय लगता है। ऐसा इसलिए है क्योंकि हम अगले अनुरोध पर जाने से पहले प्रत्येक अनुरोध के पूरा होने तक प्रतीक्षा करते हैं।

आइए एक perform_async . की शुरुआत करके इसे सुधारें हमारे कार्यकर्ता मॉड्यूल के लिए विधि। चीजों को गति देने के लिए, यह प्रत्येक URL के लिए एक नया थ्रेड बनाता है।

module Magique
  module Worker
    module ClassMethods
      def perform_async(*args)
        Thread.new { new.perform(*args) }
      end
    end
  end
end

आह्वान को TitleExtractorWorker.perform_async(url) में बदलने के बाद , हमें लगभग एक ही बार में सभी शीर्षक मिल जाते हैं। हालांकि, इसका मतलब यह भी है कि हम एक बार में रूबी मैजिक ब्लॉग के लिए 20 से अधिक कनेक्शन खोल रहे हैं। (आपके ब्लॉग के साथ खिलवाड़ करने के लिए क्षमा करें, दोस्तों! )

यदि आप लंबे समय से चल रही प्रक्रिया (जैसे वेब सर्वर) के बाहर अपने स्वयं के कार्यान्वयन और इसका परीक्षण कर रहे हैं, तो loop { sleep 1 } जैसा कुछ जोड़ना न भूलें यह सुनिश्चित करने के लिए कि प्रक्रिया तुरंत समाप्त न हो जाए, आपकी स्क्रिप्ट के अंत तक।

कार्य कतारबद्ध करना

प्रत्येक आमंत्रण के लिए एक नया धागा बनाने के दृष्टिकोण के साथ, हम अंततः संसाधन सीमा (हमारी तरफ और उन वेबसाइटों पर जो हम एक्सेस कर रहे हैं) तक पहुंच जाएंगे। जैसा कि हम अच्छे नागरिक बनना चाहते हैं, आइए कार्यान्वयन को किसी ऐसी चीज़ में बदल दें जो अतुल्यकालिक है लेकिन सेवा से इनकार करने जैसा महसूस नहीं होता है।

इस समस्या को हल करने का एक सामान्य तरीका निर्माता/उपभोक्ता पैटर्न का उपयोग करना है। एक या अधिक निर्माता कार्यों को एक कतार में धकेलते हैं जबकि एक या अधिक उपभोक्ता कतार से कार्य लेते हैं और उन्हें संसाधित करते हैं।

एक कतार मूल रूप से तत्वों की एक सूची है। सिद्धांत रूप में, एक साधारण सरणी काम करेगी। हालाँकि, जैसा कि हम संगामिति के साथ काम कर रहे हैं, हमें यह सुनिश्चित करने की आवश्यकता है कि एक समय में केवल एक निर्माता या उपभोक्ता ही कतार तक पहुँच सकता है। अगर हम इस बारे में सावधान नहीं हैं, तो चीजें अराजकता में समाप्त हो जाएंगी—जैसे दो लोग एक ही बार में एक दरवाजे को निचोड़ने की कोशिश कर रहे हैं।

इस समस्या को उत्पादक-उपभोक्ता समस्या के रूप में जाना जाता है और इसके कई समाधान हैं। सौभाग्य से, यह एक बहुत ही सामान्य समस्या है और रूबी एक उचित Queue . के साथ जहाज करती है कार्यान्वयन जिसे हम थ्रेड सिंक्रनाइज़ेशन के बारे में चिंता किए बिना उपयोग कर सकते हैं।

इसका उपयोग करने के लिए, सुनिश्चित करें कि निर्माता और उपभोक्ता दोनों कतार तक पहुंच सकते हैं। हम अपने Magique . में एक क्लास मेथड जोड़कर ऐसा करते हैं मॉड्यूल और Queue का एक उदाहरण असाइन करना इसके लिए।

module Magique
  def self.backend
    @backend
  end
 
  def self.backend=(backend)
    @backend = backend
  end
end
 
Magique.backend = Queue.new

इसके बाद, हम अपना perform_async . बदलते हैं एक कार्य को अपना नया धागा बनाने के बजाय कतार पर धकेलने के लिए कार्यान्वयन। एक कार्य को हैश के रूप में दर्शाया जाता है जिसमें कार्यकर्ता वर्ग के संदर्भ के साथ-साथ perform_async को दिए गए तर्क शामिल हैं। विधि।

module Magique
  module Worker
    module ClassMethods
      def perform_async(*args)
        Magique.backend.push(worker: self, args: args)
      end
    end
  end
end

इसके साथ, हम चीजों के निर्माता पक्ष के साथ कर रहे हैं। इसके बाद, उपभोक्ता पक्ष पर एक नजर डालते हैं।

प्रत्येक उपभोक्ता एक अलग धागा है जो कतार से कार्य लेता है और उन्हें निष्पादित करता है। एक कार्य के बाद रुकने के बजाय, जैसे कि धागा, उपभोक्ता फिर कतार से दूसरा कार्य लेता है और उसे करता है, और इसी तरह। यहां Magique::Processor . नामक एक उपभोक्ता का बुनियादी कार्यान्वयन दिया गया है . प्रत्येक प्रोसेसर एक नया थ्रेड बनाता है जो अनंत रूप से लूप करता है। प्रत्येक पुनरावृत्ति के लिए, यह कतार से एक नया कार्य प्राप्त करने का प्रयास करता है, कार्यकर्ता वर्ग का एक नया उदाहरण बनाता है, और इसके perform को कॉल करता है दिए गए तर्कों के साथ विधि।

module Magique
  class Processor
    def self.start(concurrency = 1)
      concurrency.times { |n| new("Processor #{n}") }
    end
 
    def initialize(name)
      thread = Thread.new do
        loop do
          payload = Magique.backend.pop
          worker_class = payload[:worker]
          worker_class.new.perform(*payload[:args])
        end
      end
 
      thread.name = name
    end
  end
end

प्रोसेसिंग लूप के अलावा, हम Magique::Processor.start नामक एक सुविधा विधि जोड़ते हैं . यह हमें एक साथ कई प्रोसेसर को स्पिन करने की अनुमति देता है। जबकि धागे का नामकरण वास्तव में आवश्यक नहीं है, यह हमें यह देखने की अनुमति देगा कि क्या चीजें वास्तव में अपेक्षित रूप से काम कर रही हैं।

आइए हमारे TitleExtractorWorker . के आउटपुट को एडजस्ट करें वर्तमान थ्रेड का नाम शामिल करने के लिए।

puts "[#{Thread.current.name}] #{title.gsub(/[[:space:]]+/, ' ').strip}"

हमारे बैकग्राउंड प्रोसेसिंग सेटअप का परीक्षण करने के लिए, हमें अपने कार्यों को कतारबद्ध करने से पहले प्रोसेसर के एक सेट को स्पिन करना होगा।

Magique.backend = Queue.new
Magique::Processor.start(5)
 
RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_async(url)
end
 
# [Processor 3] Bindings and Lexical Scope in Ruby | AppSignal Blog
# [Processor 4] Building a Ruby C Extension From Scratch | AppSignal Blog
# [Processor 1] Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# [Processor 0] Ruby's Hidden Gems, StringScanner | AppSignal Blog
# [Processor 2] Fibers and Enumerators in Ruby: Turning Blocks Inside Out | AppSignal Blog
# [Processor 4] Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...

जब इसे चलाया जाता है, तब भी हमें सभी लेखों के शीर्षक मिलते हैं। हालांकि यह हर कार्य के लिए एक अलग थ्रेड का उपयोग करने जितना तेज़ नहीं है, फिर भी यह प्रारंभिक कार्यान्वयन की तुलना में तेज़ है जिसमें कोई पृष्ठभूमि प्रसंस्करण नहीं था। जोड़े गए प्रोसेसर नामों के लिए धन्यवाद, हम यह भी पुष्टि कर सकते हैं कि सभी प्रोसेसर कतार के माध्यम से काम कर रहे हैं। समवर्ती प्रोसेसर की संख्या में बदलाव करके, प्रसंस्करण गति और मौजूदा संसाधन सीमाओं के बीच संतुलन खोजना संभव है।

एकाधिक प्रक्रियाओं और मशीनों तक विस्तार

अब तक, हमारे बैकग्राउंड प्रोसेसिंग सिस्टम का वर्तमान कार्यान्वयन काफी अच्छा काम करता है। हालाँकि, यह अभी भी उसी प्रक्रिया तक सीमित है। संसाधन-भूखे कार्य अभी भी पूरी प्रक्रिया के प्रदर्शन को प्रभावित करेंगे। अंतिम चरण के रूप में, आइए कार्यभार को कई प्रक्रियाओं और यहां तक ​​कि कई मशीनों में वितरित करने पर विचार करें।

कतार उत्पादकों और उपभोक्ताओं के बीच एकमात्र संबंध है। अभी, यह इन-मेमोरी कार्यान्वयन का उपयोग कर रहा है। आइए साइडकीक से अधिक प्रेरणा लें और रेडिस का उपयोग करके एक कतार लागू करें।

रेडिस के पास सूचियों के लिए समर्थन है जो हमें कार्यों को आगे बढ़ाने और लाने की अनुमति देता है। इसके अतिरिक्त, रेडिस रूबी मणि थ्रेड-सुरक्षित है और सूचियों को संशोधित करने के लिए रेडिस आदेश परमाणु हैं। ये गुण सिंक्रोनाइज़ेशन समस्याओं में भागे बिना हमारे एसिंक्रोनस बैकग्राउंड प्रोसेसिंग सिस्टम के लिए इसका उपयोग करना संभव बनाते हैं।

आइए एक रेडिस समर्थित कतार बनाएं जो push . को लागू करता है और shift Queue . जैसी विधियाँ हमने पहले इस्तेमाल किया था।

require 'json'
require 'redis'
 
module Magique
  module Backend
    class Redis
      def initialize(connection = ::Redis.new)
        @connection = connection
      end
 
      def push(job)
        @connection.lpush('magique:queue', JSON.dump(job))
      end
 
      def shift
        _queue, job = @connection.brpop('magique:queue')
        payload = JSON.parse(job, symbolize_names: true)
        payload[:worker] = Object.const_get(payload[:worker])
        payload
      end
    end
  end
end

चूंकि रेडिस रूबी वस्तुओं के बारे में कुछ भी नहीं जानता है, हमें अपने कार्यों को डेटाबेस में संग्रहीत करने से पहले उन्हें lpush का उपयोग करके JSON में क्रमबद्ध करना होगा। आदेश जो सूची के सामने एक तत्व जोड़ता है।

किसी कार्य को कतार से लाने के लिए, हम brpop . का उपयोग कर रहे हैं कमांड, जो सूची से अंतिम तत्व प्राप्त करता है। यदि सूची खाली है, तो यह तब तक अवरुद्ध रहेगा जब तक कोई नया तत्व उपलब्ध नहीं हो जाता। जब कोई कार्य उपलब्ध न हो तो हमारे प्रोसेसर को रोकने का यह एक अच्छा तरीका है। अंत में, रेडिस से एक कार्य प्राप्त करने के बाद, हमें Object.const_get का उपयोग करके कार्यकर्ता के नाम के आधार पर वास्तविक रूबी वर्ग को देखना होगा। ।

अंतिम चरण के रूप में, आइए चीजों को कई प्रक्रियाओं में विभाजित करें। चीजों के निर्माता पक्ष पर, हमें केवल इतना करना है कि बैकएंड को हमारी नई कार्यान्वित रेडिस कतार में बदल दें।

# ...
 
Magique.backend = Magique::Backend::Redis.new
 
RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_async(url)
end

चीजों के उपभोक्ता पक्ष पर, हम इस तरह की कुछ पंक्तियों से दूर हो सकते हैं:

# ...
 
Magique.backend = Magique::Backend::Redis.new
Magique::Processor.start(5)
 
loop { sleep 1 }

निष्पादित होने पर, उपभोक्ता प्रक्रिया कतार में नए काम के आने की प्रतीक्षा करेगी। एक बार जब हम निर्माता प्रक्रिया शुरू करते हैं जो कार्यों को कतार में धकेलती है, तो हम देख सकते हैं कि वे तुरंत संसाधित हो जाते हैं।

जिम्मेदारी से आनंद लें और उत्पादन में इसका इस्तेमाल न करें

जबकि हमने इसे वास्तविक दुनिया के सेटअप से दूर रखा था जिसे आप उत्पादन में उपयोग करेंगे (इसलिए नहीं!), हमने पृष्ठभूमि प्रोसेसर बनाने में कुछ कदम उठाए। हमने एक प्रक्रिया को पृष्ठभूमि सेवा के रूप में चलाकर शुरू किया। फिर हमने इसे एसिंक्स बनाया और Queue . का इस्तेमाल किया उत्पादक-उपभोक्ता समस्या का समाधान करना। फिर हमने मेमोरी में कार्यान्वयन के बजाय रेडिस का उपयोग करके प्रक्रिया को कई प्रक्रियाओं या मशीनों में विस्तारित किया।

जैसा कि पहले उल्लेख किया गया है, यह एक पृष्ठभूमि प्रसंस्करण प्रणाली का सरलीकृत कार्यान्वयन है। बहुत सी चीजें गायब हैं और स्पष्ट रूप से निपटा नहीं गया है। इनमें त्रुटि प्रबंधन, एकाधिक कतार, शेड्यूलिंग, कनेक्शन पूलिंग और सिग्नल प्रबंधन शामिल हैं (लेकिन इन्हीं तक सीमित नहीं हैं)।

फिर भी, हमें इसे लिखने में मज़ा आया और आशा है कि आपने बैकग्राउंड प्रोसेसिंग सिस्टम के हुड के नीचे एक झलक का आनंद लिया। शायद तुमने एक-दो चीजें भी छीन लीं।


  1. आरबीएस को समझना, रूबीज न्यू टाइप एनोटेशन सिस्टम

    रूबी के लिए आरबीएस एक नए प्रकार की सिंटैक्स प्रारूप भाषा का नाम है। RBS आपको .rbs नामक एक नए एक्सटेंशन के साथ फाइलों में अपने रूबी कोड में टाइप एनोटेशन जोड़ने देता है . वे इस तरह दिखते हैं: class MyClass def my_method : (my_param: String) -> String end RBS के साथ टाइप एनोटेशन प्रदान करने से आ

  1. केग स्केल अलर्टिंग सिस्टम का निर्माण

    पिछले ब्लॉग में, मैंने विस्तार से बताया था कि कैसे मैंने अपने कार्यालय में कोल्ड ब्रू कॉफी केग के वजन का उपयोग स्लैक अलर्ट भेजने के लिए किया ताकि हमें पता चल सके कि हम कब कम चल रहे हैं और कोल्ड ब्रू कॉफी केग रिफिल की आवश्यकता है। हमारी दो-भाग श्रृंखला में इस दूसरे में, मैं आपको दिखाऊंगा कि मैंने पै

  1. रूबी में 9 नई सुविधाएँ 2.6

    रूबी का एक नया संस्करण नई सुविधाओं और प्रदर्शन में सुधार के साथ आ रहा है। क्या आप परिवर्तनों के साथ बने रहना चाहेंगे? आइए एक नज़र डालते हैं! अंतहीन रेंज रूबी 2.5 और पुराने संस्करण पहले से ही अंतहीन श्रेणी के एक रूप का समर्थन करते हैं (Float::INFINITY के साथ) ), लेकिन रूबी 2.6 इसे अगले स्तर पर ले