कंपनियां अंतर्दृष्टि प्राप्त करने और अधिक आकर्षक ग्राहक अनुभव बनाने के लिए वास्तविक समय में बड़ी मात्रा में डेटा को संसाधित करने और साझा करने की आवश्यकता पर त्वरित प्रतिक्रिया देना चाहती हैं। इसलिए, आज की दुनिया में पारंपरिक डेटा प्रोसेसिंग अब व्यवहार्य नहीं है।
इसे प्राप्त करने के लिए, आपको जितनी जल्दी हो सके बहुत अधिक डेटा संसाधित करने की आवश्यकता है और फिर इसे अधिक प्रसंस्करण के लिए अन्य सेवाओं को भेजना होगा। लेकिन इन सभी त्वरित कार्रवाइयों के बीच, ईवेंट होने पर उपभोक्ताओं को सूचित करना आवश्यक है—और हम ईवेंट स्ट्रीमिंग का उपयोग करके ऐसा कर सकते हैं।
यह GitHub में रेपो है जिसका हम उपयोग करेंगे।
ईवेंट
इवेंट स्ट्रीमिंग के बारे में बात करने से पहले, आइए बात करते हैं कि इवेंट क्या है। किसी एप्लिकेशन के भीतर होने वाली घटना उपयोगकर्ता प्रक्रिया या व्यवसाय को प्रभावित करने वाली क्रियाओं से संबंधित हो सकती है।
घटनाएँ एक राज्य परिवर्तन का प्रतिनिधित्व करती हैं, न कि यह सवाल कि आवेदन को कैसे संशोधित किया जाए। इन पर उदाहरण के रूप में विचार करें:
- सेवा में प्रवेश करने वाला उपयोगकर्ता
- एक भुगतान लेनदेन
- ब्लॉग में पोस्ट प्रकाशित करने वाला लेखक
अधिकतर मामलों में, कोई ईवेंट अधिक ईवेंट ट्रिगर करेगा; उदाहरण के लिए, जब कोई उपयोगकर्ता किसी सेवा के लिए साइन अप करता है, तो ऐप उनके डिवाइस पर एक सूचना भेजता है, डेटाबेस में रिकॉर्ड सम्मिलित करता है, और एक स्वागत योग्य ईमेल भेजता है।
ईवेंट स्ट्रीमिंग
ईवेंट स्ट्रीमिंग डेटाबेस जैसे घटना स्रोतों से वास्तविक समय में डेटा कैप्चर करने के लिए एक पैटर्न है। इवेंट स्ट्रीमिंग के मुख्य भाग इस प्रकार हैं:
- दलाल :घटनाओं को संग्रहीत करने के लिए सिस्टम प्रभारी
- विषय :घटनाओं की एक श्रेणी
- निर्माता :किसी विशिष्ट विषय पर ब्रोकर को ईवेंट भेजता है
- उपभोक्ता :घटनाओं को पढ़ता है
- ईवेंट :डेटा जो निर्माता उपभोक्ताओं से संवाद करना चाहते हैं
पब्लिश और सब्सक्राइब आर्किटेक्चर पैटर्न (पब/सब पैटर्न) . के बारे में बात करना अनिवार्य है इस समय; इवेंट स्ट्रीमिंग उस पैटर्न का कार्यान्वयन है लेकिन इन परिवर्तनों के साथ:
- संदेशों के बजाय ईवेंट होते हैं।
- घटनाओं का आदेश दिया जाता है, आमतौर पर समय के अनुसार।
- उपभोक्ता विषय में एक विशिष्ट बिंदु से घटनाओं को पढ़ सकते हैं।
- घटनाओं में अस्थायी स्थायित्व है।
प्रवाह तब शुरू होता है जब निर्माता एक नया ईवेंट प्रकाशित करता है एक विषय . में (जैसा कि हमने पहले देखा, विषय एक विशिष्ट प्रकार की घटना के लिए सिर्फ वर्गीकरण है)। फिर, उपभोक्ता किसी विशेष श्रेणी की घटनाओं में रुचि रखने वाले उस विषय की सदस्यता लें। अंत में, दलाल विषय के उपभोक्ताओं की पहचान करता है और वांछित घटनाओं को उपलब्ध कराता है।
ईवेंट स्ट्रीमिंग के लाभ
-
डिकूपिंग प्रकाशकों और उपभोक्ताओं के बीच कोई निर्भरता नहीं है क्योंकि उन्हें एक दूसरे को जानने की आवश्यकता नहीं है। इसके अलावा, ईवेंट उनके कार्यों को निर्दिष्ट नहीं करते हैं, इसलिए कई उपभोक्ता एक ही ईवेंट प्राप्त कर सकते हैं और विभिन्न क्रियाएं कर सकते हैं।
-
कम विलंबता घटनाओं को अलग कर दिया जाता है और उपभोक्ता को कभी भी उनका उपयोग करने देता है; यह मिलीसेकंड में हो सकता है।
-
स्वतंत्रता जैसा कि हम जानते हैं, प्रकाशक और उपभोक्ता स्वतंत्र हैं, इसलिए अलग-अलग टीमें अन्य कार्यों या उद्देश्यों के लिए समान ईवेंट का उपयोग करके उनके साथ काम कर सकती हैं।
-
दोष सहिष्णुता कुछ ईवेंट स्ट्रीमिंग प्लेटफ़ॉर्म आपको उपभोक्ता विफलताओं से निपटने में मदद करते हैं; उदाहरण के लिए, यदि कोई त्रुटि होती है, तो उपभोक्ता अपनी स्थिति को सहेज सकते हैं और फिर से वहीं से शुरू कर सकते हैं।
-
रीयल-टाइम हैंडलिंग प्रतिक्रिया वास्तविक समय में प्राप्त होती है, इसलिए उपयोगकर्ताओं को अपने ईवेंट की प्रतिक्रिया देखने के लिए मिनटों या घंटों तक प्रतीक्षा करने की आवश्यकता नहीं होती है।
-
उच्च प्रदर्शन ईवेंट प्लेटफ़ॉर्म कम विलंबता के कारण कई संदेशों को हैंडल कर सकते हैं—उदाहरण के लिए, एक सेकंड में हज़ारों ईवेंट।
इवेंट स्ट्रीमिंग के नुकसान
-
निगरानी कुछ ईवेंट स्ट्रीमिंग टूल में संपूर्ण मॉनिटरिंग टूल नहीं होता है; वे डेटाडॉग या न्यू रेलिक जैसे अतिरिक्त उपकरणों को लागू करने के लिए कहते हैं।
-
कॉन्फ़िगरेशन कुछ टूल में कॉन्फ़िगरेशन अनुभवी लोगों के लिए भी भारी हो सकता है। कई पैरामीटर हैं, और कभी-कभी, आपको उन्हें लागू करने के लिए विषय के बारे में गहराई से जानना होगा।
-
क्लाइंट लाइब्रेरी जावा के अलावा अन्य भाषाओं में काफ्का को लागू करना आसान नहीं है। कभी-कभी, क्लाइंट लाइब्रेरी अद्यतित नहीं होती हैं, अस्थिरता दिखाती हैं, या चुनने के लिए कई विकल्प प्रदान नहीं करती हैं।
इवेंट स्ट्रीमिंग के लिए सबसे लोकप्रिय टूल में से एक है अपाचे काफ्का . यह उपकरण उपयोगकर्ताओं को जब भी और जहां भी आवश्यकता हो, डेटा भेजने, संग्रहीत करने और अनुरोध करने की अनुमति देता है; चलो इसके बारे में बात करते हैं।
अपाचे काफ्का
"अपाचे काफ्का एक ओपन-सोर्स डिस्ट्रीब्यूटेड इवेंट स्ट्रीमिंग प्लेटफॉर्म है, जिसका इस्तेमाल हजारों कंपनियां हाई-परफॉर्मेंस डेटा पाइपलाइन, स्ट्रीमिंग एनालिटिक्स, डेटा इंटीग्रेशन और मिशन-क्रिटिकल एप्लिकेशन के लिए करती हैं।"
विशेष रूप से रीयल-टाइम लॉग ट्रांसमिशन के लिए डिज़ाइन किया गया, अपाचे काफ्का उन अनुप्रयोगों के लिए आदर्श है जिनके लिए निम्नलिखित की आवश्यकता होती है:
- विभिन्न घटकों के बीच विश्वसनीय डेटा विनिमय
- एप्लिकेशन आवश्यकताओं में परिवर्तन के रूप में मैसेजिंग वर्कलोड को विभाजित करने की क्षमता
- डेटा प्रोसेसिंग के लिए रीयल-टाइम ट्रांसमिशन
आइए रेल एप्लिकेशन में काफ्का का उपयोग करें!
रेल के साथ काफ्का का उपयोग करना
रूबी में काफ्का का उपयोग करने के लिए सबसे प्रसिद्ध रत्न को ज़ेंडेस्क द्वारा रूबी-काफ्का कहा जाता है, और यह बहुत अच्छा है! फिर भी, आपको सभी कार्यान्वयन मैन्युअल रूप से करने की आवश्यकता है, यही कारण है कि हमारे पास रूबी-काफ्का के साथ निर्मित कुछ "ढांचे" हैं। वे सभी कॉन्फ़िगरेशन और निष्पादन चरणों में भी हमारी सहायता करते हैं।
कराफ्का अपाचे काफ्का-आधारित रूबी अनुप्रयोगों के विकास को सरल बनाने के लिए उपयोग किया जाने वाला एक ढांचा है।
काफ्का के साथ काम करने के लिए, जावा को स्थापित करना आवश्यक है। क्योंकि काफ्का एक स्काला और जावा एप्लिकेशन भी है, ज़ूकीपर को स्थापित करने की आवश्यकता होगी।
स्थापना से पहले, मैं ज़ुकीपर के बारे में कुछ बताना चाहता हूँ। ज़ूकीपर एक केंद्रीकृत सेवा है जो काफ्का के लिए आवश्यक है; यह एक नए विषय के निर्माण, एक दलाल के दुर्घटनाग्रस्त होने, एक दलाल को हटाने, विषयों को हटाने, आदि जैसे परिवर्तनों के मामले में सूचनाएं भेजता है।
इसका मुख्य कार्य काफ्का दलालों का प्रबंधन करना, उनके संबंधित मेटाडेटा के साथ एक सूची बनाए रखना और स्वास्थ्य-जांच तंत्र की सुविधा प्रदान करना है। इसके अलावा, यह विषयों के विभिन्न विभाजनों के लिए अग्रणी ब्रोकर का चयन करने में मदद करता है।
आवश्यकताएं
MacOS के लिए:
अब, जावा और ज़ुकीपर को निम्नलिखित कमांड के साथ स्थापित करते हैं:
brew install java
brew install zookeeper
फिर, हम इसे चलाने के लिए काफ्का को स्थापित करना जारी रख सकते हैं:
brew install kafka
एक बार जब हम काफ्का और ज़ुकीपर स्थापित कर लेते हैं, तो सेवाओं को इस तरह से शुरू करना आवश्यक है:
brew services start zookeeper
brew services start kafka
Windows और Linux के लिए:
निर्देश:
- जावा इंस्टाल करना
- ज़ूकीपर डाउनलोड करें
रेल सेट करना
हमेशा की तरह एक साधारण रेल एप्लिकेशन बनाएं:
rails new karafka_example
और जेमफाइल के भीतर कराफ्का रत्न जोड़ें:
gem 'karafka'
फिर, bundle install
चलाएं हाल ही में जोड़े गए रत्न को स्थापित करने के लिए, और सभी कराफ्का चीजों को प्राप्त करने के लिए निम्नलिखित कमांड चलाना न भूलें:
bundle exec karafka install
उस कमांड को कुछ दिलचस्प फाइलें उत्पन्न करनी चाहिए:पहला है karafka.rb
रूट डायरेक्टरी में, app/consumers/application_consumer.rb
, और app/responders/application_responder.rb
।
कराफ्का इनिशियलाइज़र
karafka.rb
फ़ाइल रेल कॉन्फ़िगरेशन से अलग प्रारंभकर्ता अनुप्रयोग की तरह है। यह आपको कराफ्का एप्लिकेशन को कॉन्फ़िगर करने और एपीआई के संदर्भ में रेल एप्लिकेशन मार्गों के समान कुछ मार्ग बनाने की अनुमति देता है। लेकिन यहां, यह विषयों और उपभोक्ताओं के लिए है।
निर्माता
निर्माता ईवेंट बनाने का प्रभारी है, और हम उन्हें app/responders
. में जोड़ सकते हैं फ़ोल्डर। अब, उपयोगकर्ताओं के लिए एक सरल निर्माता बनाते हैं:
# app/responders/users_responder.rb
class UsersResponder < ApplicationResponder
topic :users
def respond(event_payload)
respond_to :users, event_payload
end
end
उपभोक्ता
उपभोक्ता निर्माता द्वारा भेजे गए सभी घटनाओं/संदेशों को पढ़ने के लिए जिम्मेदार है। यह केवल एक उपभोक्ता है जो प्राप्त संदेश को लॉग करता है।
# app/consumers/users_consumer.rb
class UsersConsumer < ApplicationConsumer
def consume
Karafka.logger.info "New [User] event: #{params}"
end
end
हम params
. का उपयोग करते हैं घटना प्राप्त करने के लिए। लेकिन यदि आप ईवेंट को बैचों में पढ़ेंगे और आपके पास config config.batch_fetching
है सत्य के रूप में, आपको params_batch
. का उपयोग करना चाहिए ।
परीक्षण
हमारी कराफ्का सेवा (वह जो घटनाओं को सुन रही होगी) चलाने के लिए, कंसोल पर जाएं, एक नया टैब खोलें, रेल परियोजना पर जाएं, और चलाएं:
bundle exec karafka server
सफल इवेंट
अब, एक और कंसोल टैब खोलें, रेल प्रोजेक्ट पर जाएं, और इसे टाइप करें:
rails c
वहां, हमारे प्रतिसादकर्ता के साथ एक ईवेंट बनाएं:
> UsersResponder.call({ event_name: "user_created", payload: { user_id: 1 } })
यदि आप रेल कंसोल की जांच करते हैं, तो हमें यह संदेश ईवेंट बनने के बाद प्राप्त होगा:
Successfully appended 1 messages to users/0 on 192.168.1.77:9092 (node_id=0)
=> {"users"=>[["{\"event_name\":\"user_created\",\"payload\":{\"user_id\":1}}", {:topic=>"users"}]]}
और काराफ्का सर्विस टैब में, आपको कुछ इस तरह दिखाई देगा:
New [User] event: #<Karafka::Params::Params:0x00007fa76f0316c8>
Inline processing of topic users with 1 messages took 0 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:1 as processed
[[karafka_example] {}:] Committing offsets: users/0:2
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 28 to 192.168.1.77:9092
लेकिन अगर आप केवल संदेश पेलोड चाहते हैं, तो आप params.payload
. जोड़ सकते हैं आपके उपभोक्ता में और आपके पास कुछ ऐसा होगा:
Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "payload"=>{"user_id"=>1}}
Inline processing of topic users with 1 messages took 1 ms
1 message on users topic delegated to UsersConsumer
असफल इवेंट
आप कुछ विशेषताओं जैसे email
. के साथ एक उपयोगकर्ता मॉडल बना सकते हैं , first_name
और last_name
निम्नलिखित कमांड चला रहा है:
rails g model User email first_name last_name
फिर, आप इसके साथ माइग्रेशन चला सकते हैं:
rails db:migrate
अब, इस तरह कुछ सत्यापन जोड़ें:
class User < ApplicationRecord
validates :email, uniqueness: true
end
अंत में, हम उपभोक्ता को बदल सकते हैं:
class UsersConsumer < ApplicationConsumer
def consume
Karafka.logger.info "New [User] event: #{params.payload}"
User.create!(params.payload['user'])
end
end
तो, आइए एक ही ईमेल से दो ईवेंट बनाएं:
UsersResponder.call({ event_name: "user_created", user: { user_id: 1, email: '[email protected]', first_name: 'Bruce', last_name: 'Wayne' } } )
UsersResponder.call({ event_name: "user_created", user: { user_id: 2, email: '[email protected]', first_name: 'Bruce', last_name: 'Wayne' } } )
इसके साथ, डेटाबेस में पहला ईवेंट बनाया जाता है:
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>1, "email"=>"[email protected]", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
TRANSACTION (0.1ms) BEGIN
↳ app/consumers/users_consumer.rb:14:in `consume'
User Create (9.6ms) INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id" [["user_id", "1"], ["email", "[email protected]"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:29:14.827778"], ["updated_at", "2021-03-10 04:29:14.827778"]]
↳ app/consumers/users_consumer.rb:14:in `consume'
TRANSACTION (5.0ms) COMMIT
↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 70 ms
1 message on users topic delegated to UsersConsumer
लेकिन दूसरा विफल हो जाएगा, क्योंकि हमारे पास एक सत्यापन है जो कहता है कि ईमेल अद्वितीय है। यदि आप किसी मौजूदा ईमेल के साथ कोई अन्य रिकॉर्ड जोड़ने का प्रयास करते हैं, तो आपको कुछ ऐसा दिखाई देगा:
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"[email protected]", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
TRANSACTION (0.2ms) BEGIN
↳ app/consumers/users_consumer.rb:14:in `consume'
User Exists? (0.3ms) SELECT 1 AS one FROM "users" WHERE "users"."email" = $1 LIMIT $2 [["email", "[email protected]"], ["LIMIT", 1]]
↳ app/consumers/users_consumer.rb:14:in `consume'
TRANSACTION (0.2ms) ROLLBACK
↳ app/consumers/users_consumer.rb:14:in `consume'
[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42 -- ActiveRecord::RecordInvalid: Validation failed: Email has already been taken
आप अंतिम पंक्ति में त्रुटि देख सकते हैं ActiveRecord::RecordInvalid: Validation failed: Email has already been taken
. लेकिन यहां दिलचस्प बात यह है कि काफ्का इस आयोजन को बार-बार प्रोसेस करने की कोशिश करेगा। यहां तक कि अगर आप कराफ्का सर्वर को पुनरारंभ करते हैं, तो यह अंतिम घटना को संसाधित करने का प्रयास करेगा। काफ्का कैसे जानता है कि कहां से शुरू करें?
यदि आप अपना कंसोल देखते हैं, तो त्रुटि के बाद, आप इसे देखेंगे:
[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42
यह आपको बताएगा कि किस ऑफ़सेट को संसाधित किया गया था:इस मामले में, यह ऑफ़सेट 42 था। इसलिए, यदि आप काराफ्का सेवा को पुनरारंभ करते हैं, तो यह उस ऑफ़सेट में प्रारंभ हो जाएगा।
[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {users: 0}:] Fetching batches
यह अभी भी विफल होगा क्योंकि हमारे पास हमारे उपयोगकर्ता मॉडल में ईमेल सत्यापन है। इस बिंदु पर, कराफ्का सर्वर को रोकें, उस सत्यापन को हटा दें या उस पर टिप्पणी करें, और अपना सर्वर फिर से शुरू करें; आप देखेंगे कि ईवेंट को सफलतापूर्वक कैसे संसाधित किया जाता है:
[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 5 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 5 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 5 from 192.168.1.77:9092
Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"[email protected]", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
TRANSACTION (0.2ms) BEGIN
↳ app/consumers/users_consumer.rb:14:in `consume'
User Create (3.8ms) INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id" [["user_id", "2"], ["email", "[email protected]"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:49:37.832452"], ["updated_at", "2021-03-10 04:49:37.832452"]]
↳ app/consumers/users_consumer.rb:14:in `consume'
TRANSACTION (5.5ms) COMMIT
↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 69 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:43 as processed
अंत में, आप इस संदेश को अंतिम पंक्ति में देख सकते हैं:Marking users/0:43 as processed
।
कॉलबैक
यह कुछ अच्छा है जो काराफ्का प्रदान करता है:आप अपने उपभोक्ता में कॉलबैक का उपयोग कर सकते हैं। ऐसा करने के लिए, आपको केवल मॉड्यूल आयात करने और उनका उपयोग करने की आवश्यकता है। फिर, अपना UserConsumer
खोलें और इसे जोड़ें:
class UsersConsumer < ApplicationConsumer
include Karafka::Consumers::Callbacks
before_poll do
Karafka.logger.info "*** Checking something new for #{topic.name}"
end
after_poll do
Karafka.logger.info '*** We just checked for new messages!'
end
def consume
Karafka.logger.info "New [User] event: #{params.payload}"
User.create!(params.payload['user'])
end
end
पोल वह माध्यम है जिसके माध्यम से हम वर्तमान विभाजन ऑफ़सेट के आधार पर रिकॉर्ड प्राप्त करते हैं। तो, वे कॉलबैक before_poll
और after_poll
, जैसा कि उनके नाम से पता चलता है, उस समय निष्पादित किए जाते हैं। हम सिर्फ एक संदेश लॉग कर रहे हैं, और आप उन्हें अपने कराफ्का सर्वर में देख सकते हैं—एक लाने से पहले और दूसरा उसके बाद:
*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 325 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 326 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 326 from 192.168.1.77:9092
*** We just checked for new messages!
दिल की धड़कन
दिल की धड़कन ठीक उसी तरह है जैसे हम, उपभोक्ता के रूप में, काफ्का से कहते हैं कि हम जीवित हैं; अन्यथा, काफ्का मान लेगा कि उपभोक्ता मर चुका है।
काराफ्का में, हमारे पास समय की अवधि में ऐसा करने के लिए एक डिफ़ॉल्ट कॉन्फ़िगरेशन है; यह kafka.heartbeat_interval
. है और डिफ़ॉल्ट 10 सेकंड है। आप इस दिल की धड़कन को अपने कराफ्का सर्वर में देख सकते हैं।
*** Checking something new for users
[[karafka_example_example] {}:] Sending heartbeat...
[[karafka_example_example] {}:] [heartbeat] Sending heartbeat API request 72 to 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Waiting for response 72 from 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Received response 72 from 192.168.1.77:9092
*** We just checked for new messages!
Sending heartbeat...
. के साथ , काफ्का जानता है कि हम जीवित हैं और हम इसके उपभोक्ता समूह के वैध सदस्य हैं। साथ ही, हम अधिक रिकॉर्ड का उपभोग कर सकते हैं।
प्रतिबद्ध करें
ऑफ़सेट को उपभोग के रूप में चिह्नित करना ऑफ़सेट करना कहलाता है। काफ्का में, हम एक आंतरिक काफ्का विषय पर लिखकर ऑफ़सेट कमिट रिकॉर्ड करते हैं जिसे ऑफ़सेट विषय कहा जाता है। किसी संदेश को केवल तभी उपभोग माना जाता है जब उसका ऑफ़सेट ऑफ़सेट विषय के लिए प्रतिबद्ध हो।
इस प्रतिबद्धता को हर बार स्वचालित रूप से करने के लिए काराफ्का के पास एक विन्यास है; विन्यास kafka.offset_commit_interval
. है , और इसका मान डिफ़ॉल्ट रूप से 10 सेकंड है। इसके साथ, करकफा हर 10 सेकंड में एक ऑफसेट कमिट करेगा, और आप उस संदेश को अपने कराफ्का सर्वर में देख सकते हैं:
*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 307 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 308 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 308 from 192.168.1.77:9092
[[karafka_example] {}:] Committing offsets: users/0:44
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 69 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 69 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 69 from 192.168.1.77:9092
*** We just checked for new messages!
Committing offsets: users/0:44
हमें बताएं कि यह कौन सा ऑफसेट कर रहा है; मेरे मामले में, उसने काफ्का को बताया कि वह विषय 0 से ऑफसेट नंबर 44 को कमिट कर सकता है। इस तरह, अगर हमारी सेवा के साथ कुछ होता है, तो काराफ्का उस ऑफसेट से घटनाओं को संसाधित करने के लिए फिर से शुरू कर सकता है।
निष्कर्ष
ईवेंट स्ट्रीमिंग हमें तेज़ होने, डेटा का बेहतर उपयोग करने और बेहतर उपयोगकर्ता अनुभव डिज़ाइन करने में मदद करती है। तथ्य की बात के रूप में, कई कंपनियां अपनी सभी सेवाओं को संप्रेषित करने और वास्तविक समय में विभिन्न घटनाओं पर प्रतिक्रिया करने में सक्षम होने के लिए इस पैटर्न का उपयोग कर रही हैं। जैसा कि मैंने पहले उल्लेख किया है, काराफ्का के अलावा अन्य विकल्प हैं जिनका उपयोग आप रेल के साथ कर सकते हैं। आपके पास पहले से ही मूल बातें हैं; अब, बेझिझक उनके साथ प्रयोग करें।
संदर्भ
- https://kafka.apache.org/
- https://github.com/karafka/karafka
- https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern