Computer >> कंप्यूटर >  >> प्रोग्रामिंग >> Ruby

काफ्का के साथ रेल में इवेंट स्ट्रीमिंग

कंपनियां अंतर्दृष्टि प्राप्त करने और अधिक आकर्षक ग्राहक अनुभव बनाने के लिए वास्तविक समय में बड़ी मात्रा में डेटा को संसाधित करने और साझा करने की आवश्यकता पर त्वरित प्रतिक्रिया देना चाहती हैं। इसलिए, आज की दुनिया में पारंपरिक डेटा प्रोसेसिंग अब व्यवहार्य नहीं है।

इसे प्राप्त करने के लिए, आपको जितनी जल्दी हो सके बहुत अधिक डेटा संसाधित करने की आवश्यकता है और फिर इसे अधिक प्रसंस्करण के लिए अन्य सेवाओं को भेजना होगा। लेकिन इन सभी त्वरित कार्रवाइयों के बीच, ईवेंट होने पर उपभोक्ताओं को सूचित करना आवश्यक है—और हम ईवेंट स्ट्रीमिंग का उपयोग करके ऐसा कर सकते हैं।

यह GitHub में रेपो है जिसका हम उपयोग करेंगे।

ईवेंट

इवेंट स्ट्रीमिंग के बारे में बात करने से पहले, आइए बात करते हैं कि इवेंट क्या है। किसी एप्लिकेशन के भीतर होने वाली घटना उपयोगकर्ता प्रक्रिया या व्यवसाय को प्रभावित करने वाली क्रियाओं से संबंधित हो सकती है।

घटनाएँ एक राज्य परिवर्तन का प्रतिनिधित्व करती हैं, न कि यह सवाल कि आवेदन को कैसे संशोधित किया जाए। इन पर उदाहरण के रूप में विचार करें:

  • सेवा में प्रवेश करने वाला उपयोगकर्ता
  • एक भुगतान लेनदेन
  • ब्लॉग में पोस्ट प्रकाशित करने वाला लेखक

अधिकतर मामलों में, कोई ईवेंट अधिक ईवेंट ट्रिगर करेगा; उदाहरण के लिए, जब कोई उपयोगकर्ता किसी सेवा के लिए साइन अप करता है, तो ऐप उनके डिवाइस पर एक सूचना भेजता है, डेटाबेस में रिकॉर्ड सम्मिलित करता है, और एक स्वागत योग्य ईमेल भेजता है।

ईवेंट स्ट्रीमिंग

ईवेंट स्ट्रीमिंग डेटाबेस जैसे घटना स्रोतों से वास्तविक समय में डेटा कैप्चर करने के लिए एक पैटर्न है। इवेंट स्ट्रीमिंग के मुख्य भाग इस प्रकार हैं:

  • दलाल :घटनाओं को संग्रहीत करने के लिए सिस्टम प्रभारी
  • विषय :घटनाओं की एक श्रेणी
  • निर्माता :किसी विशिष्ट विषय पर ब्रोकर को ईवेंट भेजता है
  • उपभोक्ता :घटनाओं को पढ़ता है
  • ईवेंट :डेटा जो निर्माता उपभोक्ताओं से संवाद करना चाहते हैं

पब्लिश और सब्सक्राइब आर्किटेक्चर पैटर्न (पब/सब पैटर्न) . के बारे में बात करना अनिवार्य है इस समय; इवेंट स्ट्रीमिंग उस पैटर्न का कार्यान्वयन है लेकिन इन परिवर्तनों के साथ:

  • संदेशों के बजाय ईवेंट होते हैं।
  • घटनाओं का आदेश दिया जाता है, आमतौर पर समय के अनुसार।
  • उपभोक्ता विषय में एक विशिष्ट बिंदु से घटनाओं को पढ़ सकते हैं।
  • घटनाओं में अस्थायी स्थायित्व है।

प्रवाह तब शुरू होता है जब निर्माता एक नया ईवेंट प्रकाशित करता है एक विषय . में (जैसा कि हमने पहले देखा, विषय एक विशिष्ट प्रकार की घटना के लिए सिर्फ वर्गीकरण है)। फिर, उपभोक्ता किसी विशेष श्रेणी की घटनाओं में रुचि रखने वाले उस विषय की सदस्यता लें। अंत में, दलाल विषय के उपभोक्ताओं की पहचान करता है और वांछित घटनाओं को उपलब्ध कराता है।

ईवेंट स्ट्रीमिंग के लाभ

  • डिकूपिंग प्रकाशकों और उपभोक्ताओं के बीच कोई निर्भरता नहीं है क्योंकि उन्हें एक दूसरे को जानने की आवश्यकता नहीं है। इसके अलावा, ईवेंट उनके कार्यों को निर्दिष्ट नहीं करते हैं, इसलिए कई उपभोक्ता एक ही ईवेंट प्राप्त कर सकते हैं और विभिन्न क्रियाएं कर सकते हैं।

  • कम विलंबता घटनाओं को अलग कर दिया जाता है और उपभोक्ता को कभी भी उनका उपयोग करने देता है; यह मिलीसेकंड में हो सकता है।

  • स्वतंत्रता जैसा कि हम जानते हैं, प्रकाशक और उपभोक्ता स्वतंत्र हैं, इसलिए अलग-अलग टीमें अन्य कार्यों या उद्देश्यों के लिए समान ईवेंट का उपयोग करके उनके साथ काम कर सकती हैं।

  • दोष सहिष्णुता कुछ ईवेंट स्ट्रीमिंग प्लेटफ़ॉर्म आपको उपभोक्ता विफलताओं से निपटने में मदद करते हैं; उदाहरण के लिए, यदि कोई त्रुटि होती है, तो उपभोक्ता अपनी स्थिति को सहेज सकते हैं और फिर से वहीं से शुरू कर सकते हैं।

  • रीयल-टाइम हैंडलिंग प्रतिक्रिया वास्तविक समय में प्राप्त होती है, इसलिए उपयोगकर्ताओं को अपने ईवेंट की प्रतिक्रिया देखने के लिए मिनटों या घंटों तक प्रतीक्षा करने की आवश्यकता नहीं होती है।

  • उच्च प्रदर्शन ईवेंट प्लेटफ़ॉर्म कम विलंबता के कारण कई संदेशों को हैंडल कर सकते हैं—उदाहरण के लिए, एक सेकंड में हज़ारों ईवेंट।

इवेंट स्ट्रीमिंग के नुकसान

  • निगरानी कुछ ईवेंट स्ट्रीमिंग टूल में संपूर्ण मॉनिटरिंग टूल नहीं होता है; वे डेटाडॉग या न्यू रेलिक जैसे अतिरिक्त उपकरणों को लागू करने के लिए कहते हैं।

  • कॉन्फ़िगरेशन कुछ टूल में कॉन्फ़िगरेशन अनुभवी लोगों के लिए भी भारी हो सकता है। कई पैरामीटर हैं, और कभी-कभी, आपको उन्हें लागू करने के लिए विषय के बारे में गहराई से जानना होगा।

  • क्लाइंट लाइब्रेरी जावा के अलावा अन्य भाषाओं में काफ्का को लागू करना आसान नहीं है। कभी-कभी, क्लाइंट लाइब्रेरी अद्यतित नहीं होती हैं, अस्थिरता दिखाती हैं, या चुनने के लिए कई विकल्प प्रदान नहीं करती हैं।

इवेंट स्ट्रीमिंग के लिए सबसे लोकप्रिय टूल में से एक है अपाचे काफ्का . यह उपकरण उपयोगकर्ताओं को जब भी और जहां भी आवश्यकता हो, डेटा भेजने, संग्रहीत करने और अनुरोध करने की अनुमति देता है; चलो इसके बारे में बात करते हैं।

अपाचे काफ्का

"अपाचे काफ्का एक ओपन-सोर्स डिस्ट्रीब्यूटेड इवेंट स्ट्रीमिंग प्लेटफॉर्म है, जिसका इस्तेमाल हजारों कंपनियां हाई-परफॉर्मेंस डेटा पाइपलाइन, स्ट्रीमिंग एनालिटिक्स, डेटा इंटीग्रेशन और मिशन-क्रिटिकल एप्लिकेशन के लिए करती हैं।"

विशेष रूप से रीयल-टाइम लॉग ट्रांसमिशन के लिए डिज़ाइन किया गया, अपाचे काफ्का उन अनुप्रयोगों के लिए आदर्श है जिनके लिए निम्नलिखित की आवश्यकता होती है:

  • विभिन्न घटकों के बीच विश्वसनीय डेटा विनिमय
  • एप्लिकेशन आवश्यकताओं में परिवर्तन के रूप में मैसेजिंग वर्कलोड को विभाजित करने की क्षमता
  • डेटा प्रोसेसिंग के लिए रीयल-टाइम ट्रांसमिशन

आइए रेल एप्लिकेशन में काफ्का का उपयोग करें!

रेल के साथ काफ्का का उपयोग करना

रूबी में काफ्का का उपयोग करने के लिए सबसे प्रसिद्ध रत्न को ज़ेंडेस्क द्वारा रूबी-काफ्का कहा जाता है, और यह बहुत अच्छा है! फिर भी, आपको सभी कार्यान्वयन मैन्युअल रूप से करने की आवश्यकता है, यही कारण है कि हमारे पास रूबी-काफ्का के साथ निर्मित कुछ "ढांचे" हैं। वे सभी कॉन्फ़िगरेशन और निष्पादन चरणों में भी हमारी सहायता करते हैं।

कराफ्का अपाचे काफ्का-आधारित रूबी अनुप्रयोगों के विकास को सरल बनाने के लिए उपयोग किया जाने वाला एक ढांचा है।

काफ्का के साथ काम करने के लिए, जावा को स्थापित करना आवश्यक है। क्योंकि काफ्का एक स्काला और जावा एप्लिकेशन भी है, ज़ूकीपर को स्थापित करने की आवश्यकता होगी।

स्थापना से पहले, मैं ज़ुकीपर के बारे में कुछ बताना चाहता हूँ। ज़ूकीपर एक केंद्रीकृत सेवा है जो काफ्का के लिए आवश्यक है; यह एक नए विषय के निर्माण, एक दलाल के दुर्घटनाग्रस्त होने, एक दलाल को हटाने, विषयों को हटाने, आदि जैसे परिवर्तनों के मामले में सूचनाएं भेजता है।

इसका मुख्य कार्य काफ्का दलालों का प्रबंधन करना, उनके संबंधित मेटाडेटा के साथ एक सूची बनाए रखना और स्वास्थ्य-जांच तंत्र की सुविधा प्रदान करना है। इसके अलावा, यह विषयों के विभिन्न विभाजनों के लिए अग्रणी ब्रोकर का चयन करने में मदद करता है।

आवश्यकताएं

MacOS के लिए:

अब, जावा और ज़ुकीपर को निम्नलिखित कमांड के साथ स्थापित करते हैं:

brew install java
brew install zookeeper

फिर, हम इसे चलाने के लिए काफ्का को स्थापित करना जारी रख सकते हैं:

brew install kafka

एक बार जब हम काफ्का और ज़ुकीपर स्थापित कर लेते हैं, तो सेवाओं को इस तरह से शुरू करना आवश्यक है:

brew services start zookeeper
brew services start kafka

Windows और Linux के लिए:

निर्देश:

  1. जावा इंस्टाल करना
  2. ज़ूकीपर डाउनलोड करें

रेल सेट करना

हमेशा की तरह एक साधारण रेल एप्लिकेशन बनाएं:

rails new karafka_example

और जेमफाइल के भीतर कराफ्का रत्न जोड़ें:

gem 'karafka'

फिर, bundle install चलाएं हाल ही में जोड़े गए रत्न को स्थापित करने के लिए, और सभी कराफ्का चीजों को प्राप्त करने के लिए निम्नलिखित कमांड चलाना न भूलें:

bundle exec karafka install

उस कमांड को कुछ दिलचस्प फाइलें उत्पन्न करनी चाहिए:पहला है karafka.rb रूट डायरेक्टरी में, app/consumers/application_consumer.rb , और app/responders/application_responder.rb

कराफ्का इनिशियलाइज़र

karafka.rb फ़ाइल रेल कॉन्फ़िगरेशन से अलग प्रारंभकर्ता अनुप्रयोग की तरह है। यह आपको कराफ्का एप्लिकेशन को कॉन्फ़िगर करने और एपीआई के संदर्भ में रेल एप्लिकेशन मार्गों के समान कुछ मार्ग बनाने की अनुमति देता है। लेकिन यहां, यह विषयों और उपभोक्ताओं के लिए है।

निर्माता

निर्माता ईवेंट बनाने का प्रभारी है, और हम उन्हें app/responders . में जोड़ सकते हैं फ़ोल्डर। अब, उपयोगकर्ताओं के लिए एक सरल निर्माता बनाते हैं:

# app/responders/users_responder.rb

class UsersResponder < ApplicationResponder
  topic :users

  def respond(event_payload)
    respond_to :users, event_payload
  end
end

उपभोक्ता

उपभोक्ता निर्माता द्वारा भेजे गए सभी घटनाओं/संदेशों को पढ़ने के लिए जिम्मेदार है। यह केवल एक उपभोक्ता है जो प्राप्त संदेश को लॉग करता है।

# app/consumers/users_consumer.rb

class UsersConsumer < ApplicationConsumer
  def consume
    Karafka.logger.info "New [User] event: #{params}"
  end
end

हम params . का उपयोग करते हैं घटना प्राप्त करने के लिए। लेकिन यदि आप ईवेंट को बैचों में पढ़ेंगे और आपके पास config config.batch_fetching है सत्य के रूप में, आपको params_batch . का उपयोग करना चाहिए ।

परीक्षण

हमारी कराफ्का सेवा (वह जो घटनाओं को सुन रही होगी) चलाने के लिए, कंसोल पर जाएं, एक नया टैब खोलें, रेल परियोजना पर जाएं, और चलाएं:

bundle exec karafka server

सफल इवेंट

अब, एक और कंसोल टैब खोलें, रेल प्रोजेक्ट पर जाएं, और इसे टाइप करें:

rails c

वहां, हमारे प्रतिसादकर्ता के साथ एक ईवेंट बनाएं:

> UsersResponder.call({ event_name: "user_created", payload: { user_id: 1 } })

यदि आप रेल कंसोल की जांच करते हैं, तो हमें यह संदेश ईवेंट बनने के बाद प्राप्त होगा:

Successfully appended 1 messages to users/0 on 192.168.1.77:9092 (node_id=0)
=> {"users"=>[["{\"event_name\":\"user_created\",\"payload\":{\"user_id\":1}}", {:topic=>"users"}]]}

और काराफ्का सर्विस टैब में, आपको कुछ इस तरह दिखाई देगा:

New [User] event: #<Karafka::Params::Params:0x00007fa76f0316c8>
Inline processing of topic users with 1 messages took 0 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:1 as processed
[[karafka_example] {}:] Committing offsets: users/0:2
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 28 to 192.168.1.77:9092

लेकिन अगर आप केवल संदेश पेलोड चाहते हैं, तो आप params.payload . जोड़ सकते हैं आपके उपभोक्ता में और आपके पास कुछ ऐसा होगा:

Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "payload"=>{"user_id"=>1}}
Inline processing of topic users with 1 messages took 1 ms
1 message on users topic delegated to UsersConsumer

असफल इवेंट

आप कुछ विशेषताओं जैसे email . के साथ एक उपयोगकर्ता मॉडल बना सकते हैं , first_name और last_name निम्नलिखित कमांड चला रहा है:

rails g model User email first_name last_name

फिर, आप इसके साथ माइग्रेशन चला सकते हैं:

rails db:migrate

अब, इस तरह कुछ सत्यापन जोड़ें:

class User < ApplicationRecord
  validates :email, uniqueness: true
end

अंत में, हम उपभोक्ता को बदल सकते हैं:

class UsersConsumer < ApplicationConsumer
  def consume
    Karafka.logger.info "New [User] event: #{params.payload}"
    User.create!(params.payload['user'])
  end
end

तो, आइए एक ही ईमेल से दो ईवेंट बनाएं:

UsersResponder.call({ event_name: "user_created", user: { user_id: 1, email: 'batman@mail.com', first_name: 'Bruce', last_name: 'Wayne' } } )

UsersResponder.call({ event_name: "user_created", user: { user_id: 2, email: 'batman@mail.com', first_name: 'Bruce', last_name: 'Wayne' } } )

इसके साथ, डेटाबेस में पहला ईवेंट बनाया जाता है:

New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>1, "email"=>"batman@mail.com", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
  TRANSACTION (0.1ms)  BEGIN
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  User Create (9.6ms)  INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id"  [["user_id", "1"], ["email", "batman@mail.com"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:29:14.827778"], ["updated_at", "2021-03-10 04:29:14.827778"]]
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  TRANSACTION (5.0ms)  COMMIT
  ↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 70 ms
1 message on users topic delegated to UsersConsumer

लेकिन दूसरा विफल हो जाएगा, क्योंकि हमारे पास एक सत्यापन है जो कहता है कि ईमेल अद्वितीय है। यदि आप किसी मौजूदा ईमेल के साथ कोई अन्य रिकॉर्ड जोड़ने का प्रयास करते हैं, तो आपको कुछ ऐसा दिखाई देगा:

New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"batman@mail.com", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
  TRANSACTION (0.2ms)  BEGIN
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  User Exists? (0.3ms)  SELECT 1 AS one FROM "users" WHERE "users"."email" = $1 LIMIT $2  [["email", "batman@mail.com"], ["LIMIT", 1]]
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  TRANSACTION (0.2ms)  ROLLBACK
  ↳ app/consumers/users_consumer.rb:14:in `consume'
[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42 -- ActiveRecord::RecordInvalid: Validation failed: Email has already been taken

आप अंतिम पंक्ति में त्रुटि देख सकते हैं ActiveRecord::RecordInvalid: Validation failed: Email has already been taken . लेकिन यहां दिलचस्प बात यह है कि काफ्का इस आयोजन को बार-बार प्रोसेस करने की कोशिश करेगा। यहां तक ​​​​कि अगर आप कराफ्का सर्वर को पुनरारंभ करते हैं, तो यह अंतिम घटना को संसाधित करने का प्रयास करेगा। काफ्का कैसे जानता है कि कहां से शुरू करें?

यदि आप अपना कंसोल देखते हैं, तो त्रुटि के बाद, आप इसे देखेंगे:

[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42

यह आपको बताएगा कि किस ऑफ़सेट को संसाधित किया गया था:इस मामले में, यह ऑफ़सेट 42 था। इसलिए, यदि आप काराफ्का सेवा को पुनरारंभ करते हैं, तो यह उस ऑफ़सेट में प्रारंभ हो जाएगा।

[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {users: 0}:] Fetching batches

यह अभी भी विफल होगा क्योंकि हमारे पास हमारे उपयोगकर्ता मॉडल में ईमेल सत्यापन है। इस बिंदु पर, कराफ्का सर्वर को रोकें, उस सत्यापन को हटा दें या उस पर टिप्पणी करें, और अपना सर्वर फिर से शुरू करें; आप देखेंगे कि ईवेंट को सफलतापूर्वक कैसे संसाधित किया जाता है:

[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 5 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 5 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 5 from 192.168.1.77:9092
Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"batman@mail.com", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
  TRANSACTION (0.2ms)  BEGIN
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  User Create (3.8ms)  INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id"  [["user_id", "2"], ["email", "batman@mail.com"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:49:37.832452"], ["updated_at", "2021-03-10 04:49:37.832452"]]
  ↳ app/consumers/users_consumer.rb:14:in `consume'
  TRANSACTION (5.5ms)  COMMIT
  ↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 69 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:43 as processed

अंत में, आप इस संदेश को अंतिम पंक्ति में देख सकते हैं:Marking users/0:43 as processed

कॉलबैक

यह कुछ अच्छा है जो काराफ्का प्रदान करता है:आप अपने उपभोक्ता में कॉलबैक का उपयोग कर सकते हैं। ऐसा करने के लिए, आपको केवल मॉड्यूल आयात करने और उनका उपयोग करने की आवश्यकता है। फिर, अपना UserConsumer खोलें और इसे जोड़ें:

class UsersConsumer < ApplicationConsumer
  include Karafka::Consumers::Callbacks

  before_poll do
    Karafka.logger.info "*** Checking something new for #{topic.name}"
  end

  after_poll do
    Karafka.logger.info '*** We just checked for new messages!'
  end

  def consume
    Karafka.logger.info "New [User] event: #{params.payload}"
    User.create!(params.payload['user'])
  end
end

पोल वह माध्यम है जिसके माध्यम से हम वर्तमान विभाजन ऑफ़सेट के आधार पर रिकॉर्ड प्राप्त करते हैं। तो, वे कॉलबैक before_poll और after_poll , जैसा कि उनके नाम से पता चलता है, उस समय निष्पादित किए जाते हैं। हम सिर्फ एक संदेश लॉग कर रहे हैं, और आप उन्हें अपने कराफ्का सर्वर में देख सकते हैं—एक लाने से पहले और दूसरा उसके बाद:

*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 325 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 326 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 326 from 192.168.1.77:9092
*** We just checked for new messages!

दिल की धड़कन

दिल की धड़कन ठीक उसी तरह है जैसे हम, उपभोक्ता के रूप में, काफ्का से कहते हैं कि हम जीवित हैं; अन्यथा, काफ्का मान लेगा कि उपभोक्ता मर चुका है।

काराफ्का में, हमारे पास समय की अवधि में ऐसा करने के लिए एक डिफ़ॉल्ट कॉन्फ़िगरेशन है; यह kafka.heartbeat_interval . है और डिफ़ॉल्ट 10 सेकंड है। आप इस दिल की धड़कन को अपने कराफ्का सर्वर में देख सकते हैं।

*** Checking something new for users
[[karafka_example_example] {}:] Sending heartbeat...
[[karafka_example_example] {}:] [heartbeat] Sending heartbeat API request 72 to 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Waiting for response 72 from 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Received response 72 from 192.168.1.77:9092
*** We just checked for new messages!

Sending heartbeat... . के साथ , काफ्का जानता है कि हम जीवित हैं और हम इसके उपभोक्ता समूह के वैध सदस्य हैं। साथ ही, हम अधिक रिकॉर्ड का उपभोग कर सकते हैं।

प्रतिबद्ध करें

ऑफ़सेट को उपभोग के रूप में चिह्नित करना ऑफ़सेट करना कहलाता है। काफ्का में, हम एक आंतरिक काफ्का विषय पर लिखकर ऑफ़सेट कमिट रिकॉर्ड करते हैं जिसे ऑफ़सेट विषय कहा जाता है। किसी संदेश को केवल तभी उपभोग माना जाता है जब उसका ऑफ़सेट ऑफ़सेट विषय के लिए प्रतिबद्ध हो।

इस प्रतिबद्धता को हर बार स्वचालित रूप से करने के लिए काराफ्का के पास एक विन्यास है; विन्यास kafka.offset_commit_interval . है , और इसका मान डिफ़ॉल्ट रूप से 10 सेकंड है। इसके साथ, करकफा हर 10 सेकंड में एक ऑफसेट कमिट करेगा, और आप उस संदेश को अपने कराफ्का सर्वर में देख सकते हैं:

*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 307 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 308 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 308 from 192.168.1.77:9092
[[karafka_example] {}:] Committing offsets: users/0:44
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 69 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 69 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 69 from 192.168.1.77:9092
*** We just checked for new messages!

Committing offsets: users/0:44 हमें बताएं कि यह कौन सा ऑफसेट कर रहा है; मेरे मामले में, उसने काफ्का को बताया कि वह विषय 0 से ऑफसेट नंबर 44 को कमिट कर सकता है। इस तरह, अगर हमारी सेवा के साथ कुछ होता है, तो काराफ्का उस ऑफसेट से घटनाओं को संसाधित करने के लिए फिर से शुरू कर सकता है।

निष्कर्ष

ईवेंट स्ट्रीमिंग हमें तेज़ होने, डेटा का बेहतर उपयोग करने और बेहतर उपयोगकर्ता अनुभव डिज़ाइन करने में मदद करती है। तथ्य की बात के रूप में, कई कंपनियां अपनी सभी सेवाओं को संप्रेषित करने और वास्तविक समय में विभिन्न घटनाओं पर प्रतिक्रिया करने में सक्षम होने के लिए इस पैटर्न का उपयोग कर रही हैं। जैसा कि मैंने पहले उल्लेख किया है, काराफ्का के अलावा अन्य विकल्प हैं जिनका उपयोग आप रेल के साथ कर सकते हैं। आपके पास पहले से ही मूल बातें हैं; अब, बेझिझक उनके साथ प्रयोग करें।

संदर्भ

  • https://kafka.apache.org/
  • https://github.com/karafka/karafka
  • https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern

  1. बायबग, रेल और पाउ ​​के साथ रिमोट डिबगिंग

    यदि आपने पहले बायबग नहीं देखा है, तो मेरा सुझाव है कि आप इसे देखें। रूबी 2.x के लिए यह एक अच्छा डीबगर है। इसके लेखकों के शब्दों में: Byebug रूबी 2 के लिए उपयोग करने के लिए एक आसान, सुविधा संपन्न डिबगर है। यह निष्पादन नियंत्रण के लिए नए TracePoint API और कॉल स्टैक नेविगेशन के लिए नए डीबग इंस्पेक्टर A

  1. रेल के साथ टेलविंड सीएसएस का उपयोग करना

    CSS जादुई है लेकिन समय लेने वाली है। सुंदर, कार्यात्मक और सुलभ साइटों का उपयोग करना एक खुशी है, लेकिन अपना स्वयं का सीएसएस लिखना थकाऊ है। बूटस्ट्रैप जैसी कई CSS लाइब्रेरी में हाल के वर्षों में विस्फोट हुआ है और 2021 में Tailwind इस पैक में सबसे आगे है। हालांकि रेल टेलविंड आउट ऑफ बॉक्स के साथ नहीं आ

  1. रेल के साथ कोणीय का उपयोग करना 5

    आपने पहले कहानी सुनी है। आपके पास पहले से ही आपके विकेन्द्रीकृत और पूरी तरह से काम कर रहे बैक-एंड एपीआई और किसी भी सामान्य टूलसेट से बने फ्रंट-एंड पर चलने वाला एक एप्लिकेशन है। अब, आप कोणीय पर आगे बढ़ना चाहते हैं। या, शायद आप अपनी रेल परियोजनाओं के साथ एंगुलर को एकीकृत करने का एक तरीका ढूंढ रहे हैं