Created on 05-02-2019 08:58 PM
In this article I am going to create a simple Producer to publish messages(tweets) to a kafka topic.
Additionally I'm also creating a simple Consumer that subscribes to the kafka topic and reads the messages
./kafka-topics.sh --create --topic 'kafka-tweets' --partitions 3 --replication-factor 3 --zookeeper <zookeeper node:zk port>
pip install kafka-python twython
def main(): # Load credentials from json file with open("twitter_credentials.json", "r") as file: creds = json.load(file) # Instantiate python_tweets = Twython(creds['CONSUMER_KEY'], creds['CONSUMER_SECRET']) # search query query = {'q': 'cloudera', 'result_type': 'mixed', 'count': 100} #result is a python dict of tweets result = python_tweets.search(**query)['statuses'] injest_data(result)
To get access to twitter API I need to use my credentials which are stored in "twitter_credentials.json".
I then use twython to search for 100 tweets that contain word "cloudera"
The result is a python dict, that will be the input of injest_data() were I will be connecting to kafka and then send messages to topic "kafka-tweets"
def injest_data(list): #serialize dict to string via json and encode to bytes via utf-8 p = KafkaProducer(bootstrap_servers='<kafka-broker>:6667', acks='all',value_serializer=lambda m: json.dumps(m).encode('utf-8'), batch_size=1024) for item in list: p.send('kafka-tweets', value=item) p.flush(100) p.close()
def consume(): # To consume latest messages and auto-commit offsets and also decode from raw bytes to utf-8 consumer = KafkaConsumer('kafka-tweets', bootstrap_servers=['<kafka-broker>:6667'],value_deserializer=lambda m: json.loads(m.decode('utf-8')),consumer_timeout_ms=10000) for message in consumer: # message value and key are raw bytes -- need to decode print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) consumer.close()
We are subscribing to "kafka-tweets" topic and then reading the messages
tweets:0:484: key=None value={u'contributors': None, u'truncated': True, u'text': u'Urgent Requirement for an Infrastructure & Platform Engineer to work with one of our top financial clients!!\nApply\u2026 https://t.co/3pGFbOASGj', u'is_quote_status': False, u'in_reply_to_status_id': None, u'id': 1124041974875664390, u'favorite_count': 1, u'source': u'<a href="http://twitter.com" rel="nofollow">Twitter Web Client</a>', u'retweeted': False, u'coordinates': None, u'entities': {u'symbols': [], u'user_mentions': [], u'hashtags': [], u'urls': [{u'url': u'https://t.co/3pGFbOASGj', u'indices': [120, 143], u'expanded_url': u'https://twitter.com/i/web/status/1124041974875664390', u'display_url': u'twitter.com/i/web/status/1\u2026'}]}, u'in_reply_to_screen_name': None, u'id_str': u'1124041974875664390', u'retweet_count': 5, u'in_reply_to_user_id': None, u'favorited': False, u'user': {u'follow_request_sent': None, u'has_extended_profile': False, u'profile_use_background_image': False, u'time_zone': None, u'id': 89827370, u'default_profile': False, u'verified': False, u'profile_text_color': u'000000', u'profile_image_url_https': u'https://pbs.twimg.com/profile_images/644912966698037249/unhNPWuL_normal.png', u'profile_sidebar_fill_color': u'000000', u'is_translator': False, u'geo_enabled': True, u'entities': {u'url': {u'urls': [{u'url': u'http://t.co/OJFHBaiwWO', u'indices': [0, 22], u'expanded_url': u'http://www.beach-head.com', u'display_url': u'beach-head.com'}]}, u'description': {u'urls': []}}, u'followers_count': 82, u'protected': False, u'id_str': u'89827370', u'default_profile_image': False, u'listed_count': 8, u'lang': u'en', u'utc_offset': None, u'statuses_count': 2508, u'description': u'Beachhead is a Premier IT recruiting firm based in Toronto, Canada. Follow for exciting opportunities in Financial, Retail and Telecommunication sector.\U0001f600', u'friends_count': 59, u'profile_link_color': u'0570B3', u'profile_image_url': u'http://pbs.twimg.com/profile_images/644912966698037249/unhNPWuL_normal.png', u'notifications': None, u'profile_background_image_url_https': u'https://abs.twimg.com/images/themes/theme1/bg.png', u'profile_background_color': u'000000', u'profile_banner_url': u'https://pbs.twimg.com/profile_banners/89827370/1442594156', u'profile_background_image_url': u'http://abs.twimg.com/images/themes/theme1/bg.png', u'name': u'BeachHead', u'is_translation_enabled': False, u'profile_background_tile': False, u'favourites_count': 19, u'screen_name': u'BeachHeadINC', u'url': u'http://t.co/OJFHBaiwWO', u'created_at': u'Sat Nov 14 00:02:15 +0000 2009', u'contributors_enabled': False, u'location': u'Toronto, Canada', u'profile_sidebar_border_color': u'000000', u'translator_type': u'none', u'following': None}, u'geo': None, u'in_reply_to_user_id_str': None, u'possibly_sensitive': False, u'lang': u'en', u'created_at': u'Thu May 02 20:04:25 +0000 2019', u'in_reply_to_status_id_str': None, u'place': None, u'metadata': {u'iso_language_code': u'en', u'result_type': u'recent'}}
https://github.com/PedroAndrade89/kafka_twitter