Community Articles
Find and share helpful community-sourced technical articles
Labels (1)

Short description:

In this article I am going to create a simple Producer to publish messages(tweets) to a kafka topic.

Additionally I'm also creating a simple Consumer that subscribes to the kafka topic and reads the messages

Create the kafka topic:

./ --create --topic 'kafka-tweets' --partitions 3 --replication-factor 3 --zookeeper <zookeeper node:zk port>

Install necessary packages in your python project venv:

pip install kafka-python twython


def main():

    # Load credentials from json file
    with open("twitter_credentials.json", "r") as file:
        creds = json.load(file)

    # Instantiate
    python_tweets = Twython(creds['CONSUMER_KEY'], creds['CONSUMER_SECRET'])

    # search query
    query = {'q': 'cloudera', 'result_type': 'mixed', 'count': 100}

    #result is a python dict of tweets
    result =**query)['statuses']


To get access to twitter API I need to use my credentials which are stored in "twitter_credentials.json".

I then use twython to search for 100 tweets that contain word "cloudera"

The result is a python dict, that will be the input of injest_data() were I will be connecting to kafka and then send messages to topic "kafka-tweets"

def injest_data(list):

    #serialize dict to string via json and encode to bytes via utf-8
    p = KafkaProducer(bootstrap_servers='<kafka-broker>:6667', acks='all',value_serializer=lambda m: json.dumps(m).encode('utf-8'), batch_size=1024)

    for item in list:
        p.send('kafka-tweets', value=item)



def consume():
    # To consume latest messages and auto-commit offsets and also decode from raw bytes to utf-8
    consumer = KafkaConsumer('kafka-tweets',
                             bootstrap_servers=['<kafka-broker>:6667'],value_deserializer=lambda m: json.loads(m.decode('utf-8')),consumer_timeout_ms=10000)

    for message in consumer:
        # message value and key are raw bytes -- need to decode

        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                              message.offset, message.key,


We are subscribing to "kafka-tweets" topic and then reading the messages

Output (1 message):

tweets:0:484: key=None value={u'contributors': None, u'truncated': True, u'text': u'Urgent Requirement for an Infrastructure &amp; Platform Engineer to work with one of our top financial clients!!\nApply\u2026', u'is_quote_status': False, u'in_reply_to_status_id': None, u'id': 1124041974875664390, u'favorite_count': 1, u'source': u'<a href="" rel="nofollow">Twitter Web Client</a>', u'retweeted': False, u'coordinates': None, u'entities': {u'symbols': [], u'user_mentions': [], u'hashtags': [], u'urls': [{u'url': u'', u'indices': [120, 143], u'expanded_url': u'', u'display_url': u'\u2026'}]}, u'in_reply_to_screen_name': None, u'id_str': u'1124041974875664390', u'retweet_count': 5, u'in_reply_to_user_id': None, u'favorited': False, u'user': {u'follow_request_sent': None, u'has_extended_profile': False, u'profile_use_background_image': False, u'time_zone': None, u'id': 89827370, u'default_profile': False, u'verified': False, u'profile_text_color': u'000000', u'profile_image_url_https': u'', u'profile_sidebar_fill_color': u'000000', u'is_translator': False, u'geo_enabled': True, u'entities': {u'url': {u'urls': [{u'url': u'', u'indices': [0, 22], u'expanded_url': u'', u'display_url': u''}]}, u'description': {u'urls': []}}, u'followers_count': 82, u'protected': False, u'id_str': u'89827370', u'default_profile_image': False, u'listed_count': 8, u'lang': u'en', u'utc_offset': None, u'statuses_count': 2508, u'description': u'Beachhead is a Premier IT recruiting firm based in Toronto, Canada. Follow for exciting opportunities in Financial, Retail and Telecommunication sector.\U0001f600', u'friends_count': 59, u'profile_link_color': u'0570B3', u'profile_image_url': u'', u'notifications': None, u'profile_background_image_url_https': u'', u'profile_background_color': u'000000', u'profile_banner_url': u'', u'profile_background_image_url': u'', u'name': u'BeachHead', u'is_translation_enabled': False, u'profile_background_tile': False, u'favourites_count': 19, u'screen_name': u'BeachHeadINC', u'url': u'', u'created_at': u'Sat Nov 14 00:02:15 +0000 2009', u'contributors_enabled': False, u'location': u'Toronto, Canada', u'profile_sidebar_border_color': u'000000', u'translator_type': u'none', u'following': None}, u'geo': None, u'in_reply_to_user_id_str': None, u'possibly_sensitive': False, u'lang': u'en', u'created_at': u'Thu May 02 20:04:25 +0000 2019', u'in_reply_to_status_id_str': None, u'place': None, u'metadata': {u'iso_language_code': u'en', u'result_type': u'recent'}}

Code available in:

0 Kudos