export AWS_ACCESS_KEY_ID=ASIA4O54GA35ZJKP336H
export AWS_SECRET_ACCESS_KEY=OWrapWPLXH4D1FtckGYZjtDjdJcobrINLxrQ+4gJ
export AWS_SESSION_TOKEN=IQoJb3JpZ2luX2VjEE0aCXVzLXdlc3QtMiJGMEQCIF5kOCVGfVsP+bvs8iYrWHYFROehmYJUYz38HyGpz641AiBK+xu9vW9a+BGD8PRrdGH/P/nKsofKirLHRTVfWc0E/SqjAgiW//////////8BEAMaDDg1NjcwMzc2NDIxOSIMwuqqHbzBdA/+WdnCKvcBtyiXYcb+IMXcMrahs3hVQNp7VitQItdUcHxnsxivJblKvaSZAfdDbx6EsdKanTCM+ZH/YzhDUb/YvM2bhr9WwBMMHd3EJ+E7UevL/grqfKQEpf2eadSDuWoVvutluZwAW/2a4UXoUsE3mcpQm6uvymxXPXOZp+6KKeyh/UNutOxKzwpijFtaoBgOb9WX+rgfULbEjQfnH7/QhCcTtOFFwhxnkXjQ3dGWobjPdSEuBy5W/77ZPCM1FGbh+bslcJeFGXYH8t5z+C0BO/IcRIcSNqMeby4lTAKlHAUdhSbFYerLXW7nvYKYNoV7IWhfKNy6YaLsJd5CNzCP46WxBjqeAYBnV1MK4iVp/VEHu51hijowS8Udt0sNSO8B5WAsJPKSujopvjfafgHIGOCYhekwLQ7O9nSCCSeT51gILdOYEcrrWVKXfO4YXaIVHyFXO0beWQ1qA7Dlas2cTHmNhftd7ofgI82fseSvi9H9oG+Hr511pt5pwBN8owaOZh0O3c0cMsZeQFvWVeVu2DPAY5BMnQekcW4sYf3bHywKYFAk
import json
import ctypes
from dask.distributed import Client
import dask.dataframe as dd
def trim_memory() -> int:
libc = ctypes.CDLL("libc.so.6")
return libc.malloc_trim(0)
def PA0(path_to_user_reviews_csv):
"""
Given a path to a reviews dataset, creates an aggregated dataframe based on
the specified schema: <https://haojian.github.io/DSC102SP24/static_files/pa0_files/PA0_Discussion_Session.pdf>
and saves it to an output json file for comparison w/ expected output.
Parameters
----------
path_to_user_reviews_csv : str
filepath to the dataset (hosted on AWS S3)
Returns
-------
None
"""
client = Client()
# Helps fix any memory leaks.
client.run(trim_memory)
client = client.restart()
# read in user reviews csv
user_reviews_dd = dd.read_csv(path_to_user_reviews_csv)
# check if need to impute
# need 6 columns: ID, num_products, avg_ratings, first review year, helpful votes, total_votes
# df1: get helpful votes, assuming its [|# of upvote|, |# of downvote|]
user_reviews_dd['df1'] = user_reviews_dd['helpful'].apply(
lambda x: int(str(x[1:-1]).split(', ')[0]) # turn into str, then split, then back to int
) # to only get upvotes
# df2: get total votes = sum([|# of upvote|, |# of downvote|])
user_reviews_dd['df2'] = user_reviews_dd['helpful'].apply(
lambda x: int(str(x[1:-1]).split(', ')[1])
)
# df3: get year
user_reviews_dd['df3'] = user_reviews_dd['reviewTime'].str[-4:].astype(int)
user_reviews_dd = user_reviews_dd.drop(['unixReviewTime', 'helpful', 'reviewText', 'summary', 'reviewerName'], axis=1)
# groupby reviewerID
# use agg to specify multiple types of aggregations per column
user_reviews_dd = user_reviews_dd.groupby("reviewerID").agg({
'df1': 'sum',
'df2': 'sum',
'df3': 'min', # first year
"asin": 'count', # count = num products rated
"overall": 'mean' # average rating
})
# enforce correct types for schema
user_reviews_dd = user_reviews_dd.astype({
'asin': int,
'overall': float,
'reviewTime': int,
'helpful_votes': int,
'total_votes': int
}).reset_index()
# rename columns to specified names in schema
user_reviews_dd.columns = ['reviewerID', 'number_products_rated', 'avg_ratings', 'reviewing_since', 'helpful_votes', 'total_votes']
submit = user_reviews_dd.describe().compute().round(2)
with open('results_PA0.json', 'w') as outfile:
json.dump(json.loads(submit.to_json()), outfile)
df.groupby(’col’).mean()
df.groupby(’col’).median()