forked from datapages/irw
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_upload.qmd
101 lines (85 loc) · 2.95 KB
/
_upload.qmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
---
title: "Data upload"
---
```{python}
import os
import math
import numpy as np
import pandas as pd
import json
import pyreadr
import redivis
# target redivis dataset
dataset = redivis.user('datapages').dataset('item_response_warehouse')
# list of data files to upload
data_path = '/Users/mikabr/dropbox/_langcog/IRW/irw_public/'
files = sorted(os.listdir(data_path), key = str.casefold)
def upload_table(filename, mode = 'json'):
# set up table object
dataset_name = os.path.splitext(filename)[0]
table = dataset.table(dataset_name)
# skip if table already exists
if table.exists():
print('table "%s" already exists, skipping' % dataset_name)
return
# otherwise create table
print('creating table "%s"' % dataset_name)
table.create()
# read in data file and convert to expected structure
print('processing "%s"' % filename)
df = pyreadr.read_r(data_path + filename)['df']
df.columns = df.columns.str.replace('.', '_') # replace . with _ in column names
df_nulled = df.replace({np.nan: None}) # replace NaN with None for JSON
records = df_nulled.to_dict(orient = 'records') # [{var: val}, {var: val}]
print('prepared %s records' % len(records))
if mode == 'json':
record_str = str.encode(json.dumps(records))
with open('temp_data.json', 'wb') as file:
print('writing temporary json file')
file.write(record_str)
with open('temp_data.json', 'rb') as file:
print('uploading json file')
up = table.upload('upload').create(file, type = 'json', rename_on_conflict = True, remove_on_fail = True)
elif mode == 'stream':
# split records into max size batches
batch_size = 50000
records_batched = np.array_split(records, math.ceil(len(records) / batch_size))
print('split records into %s batches' % len(records_batched))
# upload data from each batch
for batch in records_batched:
print('uploading %s rows' % len(batch))
up = table.upload('upload').create(type = 'stream', rename_on_conflict = True, remove_on_fail = True)
up.insert_rows(batch.tolist())
for f in files:
upload_table(f)
```
```{python}
# tables = dataset.list_tables(max_results = 200)
# names = [tbl['name'] for tbl in tables]
# rows = [tbl['numRows'] for tbl in tables]
# table_rows = pd.DataFrame({'name': names, 'rows': rows})
# table_rows.to_csv('data/table_rows.csv', index = False)
```
```{r}
# library(dplyr)
# library(purrr)
# library(glue)
#
# table_rows <- read_csv("data/table_rows.csv")
#
# md <- read_csv("data/IRW Data Dictionary - data snapshot.csv")
# md_comp <- md |>
# slice(2:n()) |>
# select(name = "...1", nresp) |>
# mutate(name = str_remove(name, ".Rdata")) |>
# left_join(table_rows) |>
# arrange(name)
#
# md_comp |> filter(nresp != rows)
# md_comp |> filter(is.na(nresp))
# md_comp |> filter(is.na(rows))
#
# tables <- sort(table_rows$name) |> discard(\(s) str_starts(s, "_"))
# table_str <- glue("'{tables}'") |> paste(collapse = ", ")
# glue("c({table_str})")
```