Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Rising Star

Short description:

In this article I am going to create a simple Producer to publish messages(tweets) to a kafka topic.

Additionally I'm also creating a simple Consumer that subscribes to the kafka topic and reads the messages


Create the kafka topic:

./kafka-topics.sh --create --topic 'kafka-tweets' --partitions 3 --replication-factor 3 --zookeeper <zookeeper node:zk port>


Install necessary packages in your python project venv:

pip install kafka-python twython


Producer:

def main():

    # Load credentials from json file
    with open("twitter_credentials.json", "r") as file:
        creds = json.load(file)

    # Instantiate
    python_tweets = Twython(creds['CONSUMER_KEY'], creds['CONSUMER_SECRET'])

    # search query
    query = {'q': 'cloudera', 'result_type': 'mixed', 'count': 100}

    #result is a python dict of tweets
    result = python_tweets.search(**query)['statuses']

    injest_data(result)

To get access to twitter API I need to use my credentials which are stored in "twitter_credentials.json".

I then use twython to search for 100 tweets that contain word "cloudera"

The result is a python dict, that will be the input of injest_data() were I will be connecting to kafka and then send messages to topic "kafka-tweets"

def injest_data(list):

    #serialize dict to string via json and encode to bytes via utf-8
    p = KafkaProducer(bootstrap_servers='<kafka-broker>:6667', acks='all',value_serializer=lambda m: json.dumps(m).encode('utf-8'), batch_size=1024)

    for item in list:
        p.send('kafka-tweets', value=item)

    p.flush(100)
    p.close()


Consumer:

def consume():
    # To consume latest messages and auto-commit offsets and also decode from raw bytes to utf-8
    consumer = KafkaConsumer('kafka-tweets',
                             bootstrap_servers=['<kafka-broker>:6667'],value_deserializer=lambda m: json.loads(m.decode('utf-8')),consumer_timeout_ms=10000)

    for message in consumer:
        # message value and key are raw bytes -- need to decode

        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                              message.offset, message.key,
                                              message.value))

    consumer.close()

We are subscribing to "kafka-tweets" topic and then reading the messages


Output (1 message):

tweets:0:484: key=None value={u'contributors': None, u'truncated': True, u'text': u'Urgent Requirement for an Infrastructure &amp; Platform Engineer to work with one of our top financial clients!!\nApply\u2026 https://t.co/3pGFbOASGj', u'is_quote_status': False, u'in_reply_to_status_id': None, u'id': 1124041974875664390, u'favorite_count': 1, u'source': u'<a href="http://twitter.com" rel="nofollow">Twitter Web Client</a>', u'retweeted': False, u'coordinates': None, u'entities': {u'symbols': [], u'user_mentions': [], u'hashtags': [], u'urls': [{u'url': u'https://t.co/3pGFbOASGj', u'indices': [120, 143], u'expanded_url': u'https://twitter.com/i/web/status/1124041974875664390', u'display_url': u'twitter.com/i/web/status/1\u2026'}]}, u'in_reply_to_screen_name': None, u'id_str': u'1124041974875664390', u'retweet_count': 5, u'in_reply_to_user_id': None, u'favorited': False, u'user': {u'follow_request_sent': None, u'has_extended_profile': False, u'profile_use_background_image': False, u'time_zone': None, u'id': 89827370, u'default_profile': False, u'verified': False, u'profile_text_color': u'000000', u'profile_image_url_https': u'https://pbs.twimg.com/profile_images/644912966698037249/unhNPWuL_normal.png', u'profile_sidebar_fill_color': u'000000', u'is_translator': False, u'geo_enabled': True, u'entities': {u'url': {u'urls': [{u'url': u'http://t.co/OJFHBaiwWO', u'indices': [0, 22], u'expanded_url': u'http://www.beach-head.com', u'display_url': u'beach-head.com'}]}, u'description': {u'urls': []}}, u'followers_count': 82, u'protected': False, u'id_str': u'89827370', u'default_profile_image': False, u'listed_count': 8, u'lang': u'en', u'utc_offset': None, u'statuses_count': 2508, u'description': u'Beachhead is a Premier IT recruiting firm based in Toronto, Canada. Follow for exciting opportunities in Financial, Retail and Telecommunication sector.\U0001f600', u'friends_count': 59, u'profile_link_color': u'0570B3', u'profile_image_url': u'http://pbs.twimg.com/profile_images/644912966698037249/unhNPWuL_normal.png', u'notifications': None, u'profile_background_image_url_https': u'https://abs.twimg.com/images/themes/theme1/bg.png', u'profile_background_color': u'000000', u'profile_banner_url': u'https://pbs.twimg.com/profile_banners/89827370/1442594156', u'profile_background_image_url': u'http://abs.twimg.com/images/themes/theme1/bg.png', u'name': u'BeachHead', u'is_translation_enabled': False, u'profile_background_tile': False, u'favourites_count': 19, u'screen_name': u'BeachHeadINC', u'url': u'http://t.co/OJFHBaiwWO', u'created_at': u'Sat Nov 14 00:02:15 +0000 2009', u'contributors_enabled': False, u'location': u'Toronto, Canada', u'profile_sidebar_border_color': u'000000', u'translator_type': u'none', u'following': None}, u'geo': None, u'in_reply_to_user_id_str': None, u'possibly_sensitive': False, u'lang': u'en', u'created_at': u'Thu May 02 20:04:25 +0000 2019', u'in_reply_to_status_id_str': None, u'place': None, u'metadata': {u'iso_language_code': u'en', u'result_type': u'recent'}}


Code available in:

https://github.com/PedroAndrade89/kafka_twitter





3,936 Views
0 Kudos