export AWS_ACCESS_KEY_ID=ASIA4O54GA35ZJKP336H
export AWS_SECRET_ACCESS_KEY=OWrapWPLXH4D1FtckGYZjtDjdJcobrINLxrQ+4gJ
export AWS_SESSION_TOKEN=IQoJb3JpZ2luX2VjEE0aCXVzLXdlc3QtMiJGMEQCIF5kOCVGfVsP+bvs8iYrWHYFROehmYJUYz38HyGpz641AiBK+xu9vW9a+BGD8PRrdGH/P/nKsofKirLHRTVfWc0E/SqjAgiW//////////8BEAMaDDg1NjcwMzc2NDIxOSIMwuqqHbzBdA/+WdnCKvcBtyiXYcb+IMXcMrahs3hVQNp7VitQItdUcHxnsxivJblKvaSZAfdDbx6EsdKanTCM+ZH/YzhDUb/YvM2bhr9WwBMMHd3EJ+E7UevL/grqfKQEpf2eadSDuWoVvutluZwAW/2a4UXoUsE3mcpQm6uvymxXPXOZp+6KKeyh/UNutOxKzwpijFtaoBgOb9WX+rgfULbEjQfnH7/QhCcTtOFFwhxnkXjQ3dGWobjPdSEuBy5W/77ZPCM1FGbh+bslcJeFGXYH8t5z+C0BO/IcRIcSNqMeby4lTAKlHAUdhSbFYerLXW7nvYKYNoV7IWhfKNy6YaLsJd5CNzCP46WxBjqeAYBnV1MK4iVp/VEHu51hijowS8Udt0sNSO8B5WAsJPKSujopvjfafgHIGOCYhekwLQ7O9nSCCSeT51gILdOYEcrrWVKXfO4YXaIVHyFXO0beWQ1qA7Dlas2cTHmNhftd7ofgI82fseSvi9H9oG+Hr511pt5pwBN8owaOZh0O3c0cMsZeQFvWVeVu2DPAY5BMnQekcW4sYf3bHywKYFAk

Screen Shot 2024-04-23 at 3.03.33 PM.png

Screen Shot 2024-04-23 at 3.03.02 PM.png

Screen Shot 2024-04-23 at 3.02.16 PM.png

import json
import ctypes
from dask.distributed import Client
import dask.dataframe as dd

def trim_memory() -> int:
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)

def PA0(path_to_user_reviews_csv):
    """
    Given a path to a reviews dataset, creates an aggregated dataframe based on
    the specified schema: <https://haojian.github.io/DSC102SP24/static_files/pa0_files/PA0_Discussion_Session.pdf>
    and saves it to an output json file for comparison w/ expected output.

    Parameters
    ----------
    path_to_user_reviews_csv : str
        filepath to the dataset (hosted on AWS S3)
    
    Returns
    -------
    None
    
    """
    client = Client()
    # Helps fix any memory leaks.
    client.run(trim_memory)
    client = client.restart()

    # read in user reviews csv
    user_reviews_dd = dd.read_csv(path_to_user_reviews_csv)
    
    # check if need to impute
    # need 6 columns: ID, num_products, avg_ratings, first review year, helpful votes, total_votes

    # df1: get helpful votes, assuming its [|# of upvote|, |# of downvote|]
    user_reviews_dd['df1'] = user_reviews_dd['helpful'].apply(
                                                        lambda x: int(str(x[1:-1]).split(', ')[0]) # turn into str, then split, then back to int
                                                        ) # to only get upvotes

    # df2: get total votes = sum([|# of upvote|, |# of downvote|])
    user_reviews_dd['df2'] = user_reviews_dd['helpful'].apply(
                                                        lambda x: int(str(x[1:-1]).split(', ')[1])
                                                        )
    # df3: get year
    user_reviews_dd['df3'] = user_reviews_dd['reviewTime'].str[-4:].astype(int)
    user_reviews_dd = user_reviews_dd.drop(['unixReviewTime', 'helpful', 'reviewText', 'summary', 'reviewerName'], axis=1)

    # groupby reviewerID
    # use agg to specify multiple types of aggregations per column
    user_reviews_dd = user_reviews_dd.groupby("reviewerID").agg({
        'df1': 'sum',
        'df2': 'sum',
        'df3': 'min', # first year
        "asin": 'count', # count = num products rated
        "overall": 'mean' # average rating
    })
            
    # enforce correct types for schema
    user_reviews_dd = user_reviews_dd.astype({
        'asin': int,
        'overall': float,
        'reviewTime': int,
        'helpful_votes': int,
        'total_votes': int
    }).reset_index()

    # rename columns to specified names in schema
    user_reviews_dd.columns = ['reviewerID', 'number_products_rated', 'avg_ratings', 'reviewing_since', 'helpful_votes', 'total_votes']

    submit = user_reviews_dd.describe().compute().round(2)    
    with open('results_PA0.json', 'w') as outfile: 
        json.dump(json.loads(submit.to_json()), outfile)

df.groupby(’col’).mean()

df.groupby(’col’).median()