The following resources were used to author a segment of an ETL pipeline:
import os
import json
import numpy
import datetime
import pandas as pd
import csv
import requests
import requests.exceptions
import pymongo
from sqlalchemy import create_engine
host_name = "localhost"
ports = {"mongo" : 27017, "mysql" : 3306}
user_id = ""
pwd = ""
src_dbname = "s&p500_twitter"
dst_dbname = "stocks_and_tweets"
conn_str = f"mongodb://{host_name}:{ports['mongo']}/"
client = pymongo.MongoClient(conn_str)
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
# Create a connection to the MySQL database
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
#Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.
conn = sqlEngine.connect()
dframe = pd.read_sql(sql_query, conn);
conn.close()
return dframe
def get_mongo_dataframe(user_id, pwd, host_name, port, db_name, collection, query):
# Create a connection to MongoDB, with or without authentication credentials
if user_id and pwd:
mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db_name)
client = pymongo.MongoClient(mongo_uri)
else:
conn_str = f"mongodb://{host_name}:{port}/"
client = pymongo.MongoClient(conn_str)
# Query MongoDB, and fill a python list with documents to create a DataFrame
db = client[db_name]
dframe = pd.DataFrame(list(db[collection].find(query)))
dframe.drop(['_id'], axis=1, inplace=True)
client.close()
return dframe
def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
# Create a connection to the MySQL database
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()
# Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table
if db_operation == "insert":
df.to_sql(table_name, con=connection, index=False, if_exists='replace')
sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
elif db_operation == "update":
df.to_sql(table_name, con=connection, index=False, if_exists='append')
connection.close()
The following sections deals with CSV or JSON files. The option to convert to or from either exists, but for the sake of this project, the file "HistoricalQuotes.csv" is stored in the github repo alongside this page and will be converted to a JSON and used from there.
try:
data_dir = os.path.join(os.getcwd())
data_file = os.path.join(data_dir, 'all_stocks_5yr.csv')
df = pd.read_csv(data_file, header=0, index_col=0)
df.head(2)
except:
print("Error loading all_stocks_5yr.csv")
try:
data_dir = os.path.join(os.getcwd())
data_file = os.path.join(data_dir, 'Tweets.csv')
df = pd.read_csv(data_file, header=0, index_col=0)
df.head(2)
except:
print("Error loading Tweets.csv")
# the below code is adapted from https://pythonexamples.org/python-csv-to-json/
def csv_to_json(csvFilePath, jsonFilePath):
jsonArray = []
#read csv file
with open(csvFilePath, encoding='utf-8') as csvf:
#load csv file data using csv library's dictionary reader
csvReader = csv.DictReader(csvf)
#convert each csv row into python dict
for row in csvReader:
#add this python dict to json array
jsonArray.append(row)
#convert python jsonArray to JSON String and write to file
with open(jsonFilePath, 'w', encoding='utf-8') as jsonf:
jsonString = json.dumps(jsonArray, indent=4)
jsonf.write(jsonString)
try:
csvFilePath = r'all_stocks_5yr.csv'
jsonFilePath = r'all_stocks_5yr.json'
csv_to_json(csvFilePath, jsonFilePath)
except:
print("Error converting all_stocks_5yr.csv to JSON file.")
try:
csvFilePath = r'Tweets.csv'
jsonFilePath = r'tweets.json'
csv_to_json(csvFilePath, jsonFilePath)
except:
print("Error converting Tweets.csv to JSON file.")
def json_to_csv(jsonFilePath, csvFilePath):
#use pandas for conversion
with open(jsonFilePath, encoding='utf-8') as input:
df = pd.read_json(input)
df.to_csv(csvFilePath, encoding='utf-8', index=False)
try:
csvFilePath = r'all_stocks_5yr.csv'
jsonFilePath = r'all_stocks_5yr.json'
json_to_csv(jsonFilePath, csvFilePath)
csvFilePath = r'Tweets.csv'
jsonFilePath = r'tweets.json'
json_to_csv(jsonFilePath, csvFilePath)
except:
print("Error concerting from JSON to CSV.")
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")
db_name = "stocks_and_tweets"
port = ports["mongo"]
conn_str = f"mongodb://{host_name}:{port}/"
client = pymongo.MongoClient(conn_str)
db = client[src_dbname]
data_dir = os.path.join(os.getcwd())
json_files = {"twitter_sentiment" : 'tweets.json',
"stock_data" : 'all_stocks_5yr.json'
}
for file in json_files:
json_file = os.path.join(data_dir, json_files[file])
with open(json_file, 'r') as openfile:
json_object = json.load(openfile)
file = db[file]
result = file.insert_many(json_object)
#print(f"{file} was successfully loaded.")
client.close()
query = {}
port = ports["mongo"]
collection = "stock_data"
df_stock_data = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_stock_data.head(2)
#large data set... please give it a second or two
date | open | high | low | close | volume | Name | |
---|---|---|---|---|---|---|---|
0 | 2013-02-08 | 15.07 | 15.12 | 14.63 | 14.75 | 8407500 | AAL |
1 | 2013-02-11 | 14.89 | 15.01 | 14.26 | 14.46 | 8882000 | AAL |
query = {}
port = ports["mongo"]
collection = "twitter_sentiment"
df_tweets = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_tweets.head(2)
tweet_id | airline_sentiment | airline_sentiment_confidence | negativereason | negativereason_confidence | airline | airline_sentiment_gold | name | negativereason_gold | retweet_count | text | tweet_coord | tweet_created | tweet_location | user_timezone | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 570306133677760512 | neutral | 1.0 | Virgin America | cairdin | 0 | @VirginAmerica What @dhepburn said. | 2015-02-24 11:35:52 -0800 | Eastern Time (US & Canada) | ||||||
1 | 570301130888122368 | positive | 0.3486 | 0.0 | Virgin America | jnardino | 0 | @VirginAmerica plus you've added commercials t... | 2015-02-24 11:15:59 -0800 | Pacific Time (US & Canada) |
drop_cols = ['high','low']
df_stock_data.drop(drop_cols, axis=1, inplace=True)
df_stock_data.rename(columns={"Name":"ticker",}, inplace=True)
df_stock_data.head(2)
#large data set... please give it a second or two
date | open | close | volume | ticker | |
---|---|---|---|---|---|
0 | 2013-02-08 | 15.07 | 14.75 | 8407500 | AAL |
1 | 2013-02-11 | 14.89 | 14.46 | 8882000 | AAL |
drop_cols = ['tweet_id','name','text','tweet_location','airline_sentiment_gold','negativereason_gold','tweet_coord','user_timezone']
df_tweets.drop(drop_cols, axis=1, inplace=True)
df_tweets.rename(columns={"tweet_created":"tweet_date",
"airline_sentiment": "sentiment",
"airline_sentiment_confidence":"sentiment_conf",
"negativereason_confidence":"neg_conf",
"negativereason":"neg_reason"
}, inplace=True)
df_tweets.head(2)
sentiment | sentiment_conf | neg_reason | neg_conf | airline | retweet_count | tweet_date | |
---|---|---|---|---|---|---|---|
0 | neutral | 1.0 | Virgin America | 0 | 2015-02-24 11:35:52 -0800 | ||
1 | positive | 0.3486 | 0.0 | Virgin America | 0 | 2015-02-24 11:15:59 -0800 |
#Modifying the date in the tweets dataframe
df_tweets['tweet_date'] = pd.to_datetime(df_tweets['tweet_date'], format='%Y-%m-%d', exact=True)
df_tweets['tweet_date'] = df_tweets['tweet_date'].dt.strftime('%Y-%m-%d')
df_tweets.head(2)
sentiment | sentiment_conf | neg_reason | neg_conf | airline | retweet_count | tweet_date | |
---|---|---|---|---|---|---|---|
0 | neutral | 1.0 | Virgin America | 0 | 2015-02-24 | ||
1 | positive | 0.3486 | 0.0 | Virgin America | 0 | 2015-02-24 |
initial_value = 1
df_stock_data.insert(loc=0, column='stock_id', value=range(initial_value, len(df_stock_data) +initial_value))
df_stock_data.head(2)
stock_id | date | open | close | volume | ticker | |
---|---|---|---|---|---|---|
0 | 1 | 2013-02-08 | 15.07 | 14.75 | 8407500 | AAL |
1 | 2 | 2013-02-11 | 14.89 | 14.46 | 8882000 | AAL |
initial_value = 1
df_tweets.insert(loc=0, column='tweet_id', value=range(initial_value, len(df_tweets) +initial_value))
df_tweets.head(2)
tweet_id | sentiment | sentiment_conf | neg_reason | neg_conf | airline | retweet_count | tweet_date | |
---|---|---|---|---|---|---|---|---|
0 | 1 | neutral | 1.0 | Virgin America | 0 | 2015-02-24 | ||
1 | 2 | positive | 0.3486 | 0.0 | Virgin America | 0 | 2015-02-24 |
dataframe = df_stock_data
table_name = 'stock_data'
primary_key = 'stock_id'
db_operation = "insert"
set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)
dataframe = df_tweets
table_name = 'tweet_data'
primary_key = 'tweet_id'
db_operation = "insert"
set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)
sql_stock_data = "SELECT * FROM stocks_and_tweets.stock_data;"
df_stocks_data_sql = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_stock_data)
df_stocks_data_sql.head(2)
stock_id | date | open | close | volume | ticker | |
---|---|---|---|---|---|---|
0 | 1 | 2013-02-08 | 15.07 | 14.75 | 8407500 | AAL |
1 | 2 | 2013-02-11 | 14.89 | 14.46 | 8882000 | AAL |
sql_tweet_data = "SELECT * FROM stocks_and_tweets.tweet_data;"
df_tweet_data_sql = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_tweet_data)
df_tweet_data_sql.head(2)
tweet_id | sentiment | sentiment_conf | neg_reason | neg_conf | airline | retweet_count | tweet_date | |
---|---|---|---|---|---|---|---|---|
0 | 1 | neutral | 1.0 | Virgin America | 0 | 2015-02-24 | ||
1 | 2 | positive | 0.3486 | 0.0 | Virgin America | 0 | 2015-02-24 |
# Records Count for stock data
sql_records_stock_count = "SELECT count(*) AS stock_row_count FROM stocks_and_tweets.stock_data;"
df_records_stock_count = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_records_stock_count)
print(df_records_stock_count)
stock_row_count 0 219868
# Records Count for twitter data
sql_records_tweet_count = "SELECT count(*) AS tweet_row_count FROM stocks_and_tweets.tweet_data;"
df_records_tweet_count = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_records_tweet_count)
print(df_records_tweet_count)
tweet_row_count 0 14640
# Column count for stock data
sql_column_stock_count = "SELECT count(*) AS stock_column_count FROM information_schema.columns WHERE table_name = 'stock_data';"
df_column_stock_count = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_column_stock_count)
print(df_column_stock_count)
# Reference: https://stackoverflow.com/questions/658395/find-the-number-of-columns-in-a-table
stock_column_count 0 6
# Column Count for twitter data
sql_column_tweet_count = "SELECT count(*) AS tweet_column_count FROM information_schema.columns WHERE table_name = 'tweet_data';"
df_column_tweet_count = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_column_tweet_count)
print(df_column_tweet_count)
tweet_column_count 0 8
sql_positive_count = "SELECT COUNT(*) FROM stocks_and_tweets.tweet_data WHERE sentiment='positive';"
df_positive_count = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_positive_count)
print("The number of tweets with positive sentiment from 2015-02-16 to 2015-02-24 are shown below: ")
df_positive_count
The number of tweets with positive sentiment from 2015-02-16 to 2015-02-24 are shown below:
COUNT(*) | |
---|---|
0 | 2363 |
sql_negative_count = "SELECT COUNT(*) FROM stocks_and_tweets.tweet_data WHERE sentiment='negative';"
df_negative_count = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_negative_count)
print("The number of tweets with negative sentiment from 2015-02-16 to 2015-02-24 are shown below: ")
df_negative_count
The number of tweets with negative sentiment from 2015-02-16 to 2015-02-24 are shown below:
COUNT(*) | |
---|---|
0 | 9178 |
sql_aal_during_that_time = """SELECT open, close, ticker, date
FROM stocks_and_tweets.stock_data
WHERE stock_data.ticker = 'AAL' AND
DATE BETWEEN '2015-02-16' AND '2015-02-24';"""
df_aal_during_that_time = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_aal_during_that_time)
print("Below is the sql query for American Airlines during that same short time period: ")
df_aal_during_that_time
Below is the sql query for American Airlines during that same short time period:
open | close | ticker | date | |
---|---|---|---|---|
0 | 48.77 | 48.01 | AAL | 2015-02-17 |
1 | 48.6 | 49.31 | AAL | 2015-02-18 |
2 | 50.78 | 49.78 | AAL | 2015-02-19 |
3 | 49.81 | 51.02 | AAL | 2015-02-20 |
4 | 51.75 | 51.31 | AAL | 2015-02-23 |
5 | 50.29 | 51.52 | AAL | 2015-02-24 |