Skip to content

Commit

Permalink
Merged main
Browse files Browse the repository at this point in the history
  • Loading branch information
mmcdermott committed Sep 4, 2024
2 parents 4e92bf6 + cb301a0 commit 26cbf72
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 20 deletions.
21 changes: 11 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ up MEDS, we will define some key terminology that we use in this standard.
In particular, in almost all structured, longitudinal datasets, a measurement can be described as
consisting of a tuple containing a `subject_id` (who this measurement is about); a `time` (when this
measurement happened); some categorical qualifier describing what was measured, which we will call a
`code`; a value of a given type, such as a `numerical_value`, a `text_value`, or a `categorical_value`;
`code`; a value of a given type, such as a `numeric_value`, a `text_value`, or a `categorical_value`;
and possibly one or more additional measurement properties that describe the measurement in a
non-standardized manner.

Expand Down Expand Up @@ -55,12 +55,12 @@ found in the following subfolders:
- `$MEDS_ROOT/metdata/subject_splits.parquet`: This schema contains information in the _subject split
schema_ about what splits different subjects are in.

Task label dataframes are stored in the _TODO label_ schema, in a file path that depends on both a
Task label dataframes are stored in the `label_schema`, in a file path that depends on both a
`$TASK_ROOT` directory where task label dataframes are stored and a `$TASK_NAME` parameter that separates
different tasks from one another. In particular, the file glob `glob($TASK_ROOT/$TASK_NAME/**/*.parquet)` will
retrieve a sharded set of dataframes in the _TODO label_ schema where the sharding matches up precisely with
retrieve a sharded set of dataframes in the `label_schema` where the sharding may or may not match up with
the sharding used in the raw `$MEDS_ROOT/data/**/*.parquet` files (e.g., the file
`$TASK_ROOT/$TASK_NAME/$SHARD_NAME.parquet` will cover the labels for the same set of subjects as are
`$TASK_ROOT/$TASK_NAME/$SHARD_NAME.parquet` may cover the labels for the same set of subjects as are
contained in the raw data file at `$MEDS_ROOT/data/**/*.parquet`). Note that (1) `$TASK_ROOT` may be a subdir
of `$MEDS_ROOT` (e.g., often `$TASK_ROOT` will be set to `$MEDS_ROOT/tasks`), (2) `$TASK_NAME` may have `/`s
in it, thereby rendering the task label directory a deep, nested subdir of `$TASK_ROOT`, and (3) in some
Expand All @@ -86,14 +86,15 @@ In addition, it can contain any number of custom properties to further enrich ob
function below generates a pyarrow schema for a given set of custom properties.

```python
def data(custom_properties=[]):
def data_schema(custom_properties=[]):
return pa.schema(
[
("subject_id", pa.int64()),
("time", pa.timestamp("us")), # Static events will have a null timestamp
("code", pa.string()),
("numeric_value", pa.float32()),
] + custom_properties
(subject_id_field, subject_id_dtype),
(time_field, time_dtype), # Static events will have a null timestamp
(code_field, code_dtype),
(numeric_value_field, numeric_value_dtype),
]
+ custom_properties
)
```

Expand Down
28 changes: 28 additions & 0 deletions src/meds/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,46 @@
CodeMetadata,
DatasetMetadata,
Label,
birth_code,
code_dtype,
code_field,
code_metadata_filepath,
code_metadata_schema,
data_schema,
data_subdirectory,
dataset_metadata_filepath,
dataset_metadata_schema,
death_code,
description_dtype,
description_field,
held_out_split,
label_schema,
numeric_value_dtype,
numeric_value_field,
parent_codes_dtype,
parent_codes_field,
prediction_time_field,
subject_id_dtype,
subject_id_field,
subject_split_schema,
subject_splits_filepath,
time_dtype,
time_field,
train_split,
tuning_split,
)

# List all objects that we want to export
_exported_objects = {
"code_metadata_filepath": code_metadata_filepath,
"subject_splits_filepath": subject_splits_filepath,
"dataset_metadata_filepath": dataset_metadata_filepath,
"data_subdirectory": data_subdirectory,
"prediction_time_field": prediction_time_field,
"description_field": description_field,
"description_dtype": description_dtype,
"parent_codes_field": parent_codes_field,
"parent_codes_dtype": parent_codes_dtype,
"data_schema": data_schema,
"label_schema": label_schema,
"Label": Label,
Expand All @@ -38,6 +62,10 @@
"DatasetMetadata": DatasetMetadata,
"birth_code": birth_code,
"death_code": death_code,
"numeric_value_dtype": numeric_value_dtype,
"numeric_value_field": numeric_value_field,
"time_dtype": time_dtype,
"code_dtype": code_dtype,
"subject_id_field": subject_id_field,
"time_field": time_field,
"code_field": code_field,
Expand Down
51 changes: 41 additions & 10 deletions src/meds/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
each schema should capture, etc.
"""
import datetime
import os
from typing import List, Optional

import pyarrow as pa
Expand All @@ -22,17 +23,30 @@
# Both of these restrictions allow the stream rolling processing (see https://docs.pola.rs/api/python/stable/reference/dataframe/api/polars.DataFrame.rolling.html), # noqa: E501
# which vastly simplifies many data analysis pipelines.

data_subdirectory = "data"

# We define some codes for particularly important events

subject_id_field = "subject_id"
time_field = "time"
code_field = "code"
numeric_value_field = "numeric_value"

subject_id_dtype = pa.int64()

# The time datatype must use "us" as units to match datetime.datetime's internal resolution
time_dtype = pa.timestamp("us")

code_dtype = pa.string()
numeric_value_dtype = pa.float32()

def data_schema(custom_properties=[]):
return pa.schema(
[
(subject_id_field, subject_id_dtype),
(time_field, pa.timestamp("us")), # Static events will have a null timestamp
(code_field, pa.string()),
("numeric_value", pa.float32()),
(time_field, time_dtype), # Static events will have a null timestamp
(code_field, code_dtype),
(numeric_value_field, numeric_value_dtype),
]
+ custom_properties
)
Expand All @@ -47,11 +61,13 @@ def data_schema(custom_properties=[]):
# including the prediction time. Exclusive prediction times are not currently supported, but if you have a use
# case for them please add a GitHub issue.

prediction_time_field = "prediction_time"

label_schema = pa.schema(
[
(subject_id_field, subject_id_dtype),
# The subject who is being labeled.
("prediction_time", pa.timestamp("us")),
(prediction_time_field, time_dtype),
# The time the prediction is made.
# Machine learning models are allowed to use features that have timestamps less than or equal
# to this timestamp.
Expand All @@ -68,8 +84,8 @@ def data_schema(custom_properties=[]):
Label = TypedDict(
"Label",
{
"subject_id": int,
"prediction_time": datetime.datetime,
subject_id_field: int,
prediction_time_field: datetime.datetime,
"boolean_value": Optional[bool],
"integer_value": Optional[int],
"float_value": Optional[float],
Expand All @@ -83,6 +99,8 @@ def data_schema(custom_properties=[]):

# The subject split schema.

subject_splits_filepath = os.path.join("metadata", "subject_splits.parquet")

train_split = "train" # For ML training.
tuning_split = "tuning" # For ML hyperparameter tuning. Also often called "validation" or "dev".
held_out_split = "held_out" # For final ML evaluation. Also often called "test".
Expand All @@ -99,6 +117,8 @@ def data_schema(custom_properties=[]):
# The dataset metadata schema.
# This is a JSON schema.

dataset_metadata_filepath = os.path.join("metadata", "dataset.json")

dataset_metadata_schema = {
"type": "object",
"properties": {
Expand Down Expand Up @@ -131,14 +151,21 @@ def data_schema(custom_properties=[]):
# The code metadata schema.
# This is a parquet schema.

code_metadata_filepath = os.path.join("metadata", "codes.parquet")

description_field = "description"
description_dtype = pa.string()

parent_codes_field = "parent_codes"
parent_codes_dtype = pa.list_(pa.string())

# Code metadata must contain at least one row for every unique code in the dataset
def code_metadata_schema(custom_per_code_properties=[]):
return pa.schema(
[
("code", pa.string()),
("description", pa.string()),
("parent_codes", pa.list_(pa.string())),
(code_field, code_dtype),
(description_field, description_dtype),
(parent_codes_field, parent_codes_dtype),
# parent_codes must be a list of strings, each string being a higher level
# code that represents a generalization of the provided code. Parent codes
# can use any structure, but is recommended that they reference OMOP concepts
Expand All @@ -152,4 +179,8 @@ def code_metadata_schema(custom_per_code_properties=[]):

# Python type for the above schema

CodeMetadata = TypedDict("CodeMetadata", {"code": str, "description": str, "parent_codes": List[str]}, total=False)
CodeMetadata = TypedDict(
"CodeMetadata",
{code_field: str, description_field: str, parent_codes_field: List[str]},
total=False
)

0 comments on commit 26cbf72

Please sign in to comment.