Creating document-type datasets from the JUUL Labs Collection¶
In this tutorial, we will walk through how to create a dataset of all documents of a particular type from the UNC case within the JUUL Labs Collection. This workflow can be adjusted to work with other document types, documents from other cases, or documents from other collections from the Industry Documents Library.
The driving reason for this tutorial is that the zipped folder for the JUUL Labs Collection is almost 32 gigabytes alone, and much larger if you unzip the folder! This can take up a lot of space on your computer, especially if you're also creating derivative datasets.
We recommend downloading the zipped folder and running this code in a virtual machine or cloud computing environment, then downloading the created dataset to your own computer for processing. If you can do all of your analysis in a virtual environment, all the better!
# Let's import the necessary libraries for this tutorial
import os
import zipfile
import polars as pl
from concurrent.futures import ThreadPoolExecutor
import threading
from tqdm import tqdm # For progress tracking
First, We'll specify our zipped folder's path and where we want to save our created file.
zip_path = 'JUUL_Labs_Collection.zip'
save_dir = '/content/unc-emails'
last_num = 0
process_file
takes a zipped CSV and filters out the rows that pertain to the North Carolina vs JUUL Labs case and are emails. You can change which document type the function filters out. The different document types are:
- document
- unknown
- html
- spreadsheet
- image
- text
- calendar
- presentation
- other
- video
- audio
def process_file(zip_dir, save_dir, file, lock, main_df_dict):
# Only process CSV files
if file.endswith('.csv'):
with zipfile.ZipFile(zip_dir, 'r') as zipf:
with zipf.open(file) as f:
try:
# filters for 'State of North Carolina' and 'email' (this can be adjusted for your project)
temp_df = pl.read_csv(f, separator='|')\
.filter((pl.col('case').str.contains('State of North Carolina')),
(pl.col('type').str.contains('email'))) # you can change this document type depending on your research
if temp_df.shape[0] != 0:
with lock:
if 'df' not in main_df_dict:
main_df_dict['df'] = temp_df
else:
main_df_dict['df'] = pl.concat([main_df_dict['df'], temp_df])
return f'Successfully processed file and added to main dataframe'
return f'Empty result for file'
except Exception as e:
return f'Error processing file: {str(e)}'
return f'Skipped non-CSV file'
The function below runs the previous function with four workers, so the jobs occur parallel. This makes the process of filtering through a couple thousand CSVs quite a bit faster.
Lastly, it saves the combined dataframe as a parquet file.
def save_content(zip_dir, save_dir, start_index=0):
# Create save directory if it doesn't exist
os.makedirs(save_dir, exist_ok=True)
# Shared dictionary to store the combined dataframe
main_df_dict = {}
lock = threading.Lock()
# Get list of files to process
with zipfile.ZipFile(zip_dir, 'r') as zipf:
files_to_process = [(file) for idx, file in enumerate(zipf.namelist())
if idx >= start_index and file.endswith('.csv')]
total_files = len(files_to_process)
print(f"Starting to process {total_files} CSV files with 6 workers (from index {start_index})...")
# Process files in parallel
with ThreadPoolExecutor(max_workers=6) as executor:
# Using tqdm to show progress
futures = []
for file in files_to_process:
future = executor.submit(file, zip_dir, save_dir, file, lock, main_df_dict)
futures.append(future)
# Track progress
for future in tqdm(futures, total=total_files, desc="Processing files"):
result = future.result()
# Save the combined dataframe
if 'df' in main_df_dict and main_df_dict['df'] is not None:
output_file = os.path.join(save_dir, 'juul_nc_emails.parquet')
main_df_dict['df'].write_parquet(output_file)
print(f"Saved combined dataframe with {main_df_dict['df'].shape[0]} rows to {output_file}")
else:
print("No data was found to save")
save_content(zip_path, save_dir, last_num)
Starting to process 2206 files with 6 workers (from index 0)...
Processing files: 100%|██████████| 2206/2206 [11:56<00:00, 3.08it/s]
Completed processing 2206 files starting from index 0
Now, let's read the saved parquet file to verify the results This is a simple check to see if the file was saved correctly by verifying the number of rows
df = pl.read_parquet('juul_unc_emails.parquet')
print(f"Number of rows: {df.shape[0]}")
Number of rows: 1685701