How to Create realtime twitter pipeline using kafka tweepy and python jupyter notebook cloudera

In Section You Will Able to Learn How to Create realtime twitter pipeline using kafka tweepy and python jupyter notebook cloudera

1: Kafka cluster should up and running

Confluent kafka dashboard

2: Kafka cluster should up and running

twitter secret key and token key for kafka pipeline

3: Install python only or with jupyter notebook

jupyter notebook for kafka pipeline

3: Install below python packages as wel

               
pip install tweepy==3.3.0
pip install kafka

                
               

Now open terminal and start kafka consumer

start kafka consumer
               
/root/confluent-5.1.2/bin/kafka-console-consumer --bootstrap-server 159.203.83.126:9092 --topic twitterer_data --from-beginning

                
               

Python full code you can create .py file or using jupyter notebook you can tun

               
import tweepy
import time
from kafka import KafkaConsumer, KafkaProducer

# twitter setup
consumer_key = "kufNUXwBYDBtL3PvzqRsdte"
consumer_secret = "CtIIQVPZe0li29e0gDbOpzdajv39YVsdNUrVUMWf8RUm5Ohj"
access_token = "92658086742999449dsmbxggzNLFIRTXWcSFqFM250sdsT"
access_token_secret = "r3k2wrcl57dsdAIDVBHedsds7raUbFNsTDXCSzCz4JRoC"
# Creating the authentication object
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
# Setting your access token and secret
auth.set_access_token(access_token, access_token_secret)
# Creating the API object by passing in auth information
api = tweepy.API(auth)


from datetime import datetime, timedelta

def normalize_timestamp(time):
    mytime = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
    mytime += timedelta(hours=1)   # the tweets are timestamped in GMT timezone, while I am in +1 timezone
    return (mytime.strftime("%Y-%m-%d %H:%M:%S"))


producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic_name = 'twitterer_data'

def get_twitter_data():
    res = api.search("Apple OR iphone OR iPhone")
    for i in res:
        record = ''
        record += str(i.user.id_str)
        record += ';'
        record += str(normalize_timestamp(str(i.created_at)))
        record += ';'
        record += str(i.user.followers_count)
        record += ';'
        record += str(i.user.location)
        record += ';'
        record += str(i.favorite_count)
        record += ';'
        record += str(i.retweet_count)
        record += ';'
        producer.send(topic_name, str.encode(record))


get_twitter_data()

def periodic_work(interval):
    while True:
        get_twitter_data()
        #interval should be an integer, the number of seconds to wait
        time.sleep(interval)

periodic_work(60 * 0.1)  # get data every couple of minutes
                
               

Run jupyter notebook and check consumer terminal

run jupyter notebook python kafka

Getting twitter data into terminal

twitter data using kafka python
If You Want to Know More Checkout My Youtube Channel