Skip to content

Latest commit

 

History

History
746 lines (534 loc) · 25.1 KB

README.md

File metadata and controls

746 lines (534 loc) · 25.1 KB

Tasrif

A Python framework for processing wearable data in the health domain.

SIHA Made with Python Maintenance Workflow Status PyPI - Downloads PyPI - Version GitHub Watchers GitHub Stars

Tasrif is a library for processing of eHealth data. It provides:

  • A pipeline DSL for chaining together commonly used processing operations on time-series eHealth data, such as resampling, normalization, etc.
  • DataReaders for reading eHealth datasets such as MyHeartCounts, SleepHealth and data from FitBit devices.

Installation

To use Tasrif, you will need to have the package installed. Please follow the bellow steps to install Tasrif:

First, create a virtual environment using venv with a linux operating system machine, or with Windows Subsystem for Linux

# Create a virtual environment
python3 -m venv tasrif-env

# Activate the virtual environment
source tasrif-env/bin/activate

# Upgrade pip
(tasrif-env) pip install --upgrade pip

Then, install Tasrif either from PyPI

(tasrif-env) pip install tasrif

or install from source

(tasrif-env) git clone https://github.com/qcri/tasrif
(tasrif-env) cd tasrif
(tasrif-env) pip install -e .

Important installation note: one of Tasrif's dependancies is pyjq which requires gcc commands to be installed in your local machine. Specifically pyjq requires autoconf, automake, libtool, python3-devel.x86_64, python3-tkinter, python-pip, jq and awscli. See more about this issue here. To avoid the hassle of installing these libraries, we recommend running Tasrif with Docker.

If no installation errors occur, see Quick start by usecase section to use Tasrif.

Running Tasrif with docker

To avoid the hassle of Tasrif installation, you can use Tasrif in a docker container that launches a local jupyter notebook. Make sure you have docker installed in your operating system before running the following commands.

cd tasrif
docker build -t tasrif .
docker run -i -p 8888:8888 tasrif

You can mount a local directory to the container with the following command

docker run -i -v <some/local/directory>:/home/mnt -p 8888:8888 tasrif

After running the container, visit http://127.0.0.1:8888/ in your preferred browser to work with the jupyter notebook.

Note on feature extraction using Tasrif

Due to some outdated internal Tasrif dependancies on Pypi, we have decided to place those dependancies in requirements.txt. Once those packages are updated in Pypi, we will move them back to setup.py. The current requirements.txt specifies the dependancies links directly from Github. If you plan to use the following two operators: TSFreshFeatureExtractorOperator or CalculateTimeseriesPropertiesOperator, you will need TSFresh and Kats packages installed, which can be done by running the following command

(tasrif-env) MINIMAL_KATS=1 pip install -r requirements.txt

Note that MINIMAL_KATS=1 is passed in the installation script to minimally install Kats. See requirements.txt for details.

Features

Pipeline DSL

Tasrif provies a variety of processing operators that can be chained together in a pipeline. The operators themselves take as input and output Pandas DataFrames.

For example, consider the AggregateOperator:

>>> import pandas as pd
>>> from tasrif.processing_pipeline.custom import AggregateOperator
>>> from tasrif.processing_pipeline.pandas import DropNAOperator

>>> df0 = pd.DataFrame([
        ['Doha', 25, 30],
        ['Doha', 17, 50],
        ['Dubai', 20, 40],
        ['Dubai', 21, 42]],
        columns=['city', 'min_temp', 'max_temp'])

>>> operator = AggregateOperator(
    groupby_feature_names="city",
    aggregation_definition={"min_temp": ["mean", "std"]})

>>> df0 = operator.process(df0)

>>> df0

[    city  min_temp_mean  min_temp_std
0   Doha           21.0      5.656854
1  Dubai           20.5      0.707107]

Operators are meant to be used as part of a pipeline, where they can be chained together for sequential processing of data:

>>> import pandas as pd
>>> from tasrif.processing_pipeline import SequenceOperator
>>> from tasrif.processing_pipeline.custom import AggregateOperator, CreateFeatureOperator
>>> from tasrif.processing_pipeline.pandas import ConvertToDatetimeOperator, SortOperator

>>> df0 = pd.DataFrame([
        ['15-07-2021', 'Doha', 25, 30],
        ['16-07-2021', 'Doha', 17, 50],
        ['15-07-2021', 'Dubai', 20, 40],
        ['16-07-2021', 'Dubai', 21, 42]],
        columns=['date', 'city', 'min_temp', 'max_temp'])

>>> pipeline = SequenceOperator([
        ConvertToDatetimeOperator(feature_names=["date"]),
        CreateFeatureOperator(
            feature_name='avg_temp',
            feature_creator=lambda df: (df['min_temp'] + df['max_temp'])/2),
        SortOperator(by='avg_temp')
    ])

>>> pipeline.process(df0)
[        date   city  min_temp  max_temp  avg_temp
0 2021-07-15   Doha        25        30      27.5
2 2021-07-15  Dubai        20        40      30.0
3 2021-07-16  Dubai        21        42      31.5
1 2021-07-16   Doha        17        50      33.5]

DataReaders

Tasrif also comes with DataReader classes for importing various eHealth datasets into pipelines. These readers preprocess the raw data and convert them into a DataFrame for downstream processing in a pipeline.

Supported datasets include:

DataReaders can be used by treating them as source operators in a pipeline:

from tasrif.processing_pipeline import SequenceOperator
from tasrif.data_readers.my_heart_counts import DayOneSurveyDataset
from tasrif.processing_pipeline import DropNAOperator

day_one_survey_path = <path to MyHeartCounts DayOneSurvey file>

pipeline = Pipeline([
    DayOneSurveyDataset(day_one_survey_path),
    DropNAOperator,
    SetIndexOperator('healthCode'),
])

pipeline.process()

Quick start by usecase

Reading data

Reading a single csv file

from tasrif.processing_pipeline.pandas import ReadCsvOperator

operator = ReadCsvOperator('examples/quick_start/csvs/participant1.csv')
df = operator.process()[0]

Reading multiple csvs in a folder

from tasrif.processing_pipeline.custom import ReadCsvFolderOperator

operator = ReadCsvFolderOperator(name_pattern='examples/quick_start/csvs/*.csv')
df = operator.process()[0]

by default, ReadCsvFolderOperator concatenates the csvs into one dataframe. if you would like to work on the csvs separately, you can pass the argument concatenate=False to ReadCsvFolderOperator, which returns a python generator that iterates the csvs.

Reading csvs referenced by a column in dataframe df

import pandas as pd
from tasrif.processing_pipeline.custom import ReadNestedCsvOperator

df = pd.DataFrame({"name": ['Alfred', 'Roy'],
                   "age": [43, 32],
                   "csv_files_column": ['participant1.csv', 'participant2.csv']})

operator = ReadNestedCsvOperator(folder_path='examples/quick_start/csvs/',
                                 field='csv_files_column')
generator = operator.process(df)[0]

for record, details in generator:
    print(record)
    print(details)

Reading json files referenced by a column in dataframe df

import pandas as pd
from tasrif.processing_pipeline.custom import IterateJsonOperator

df = pd.DataFrame({"name": ['Alfred', 'Roy'],
                   "age": [43, 32],
                   "json_files_column": ['participant1.json', 'participant2.json']})

operator = IterateJsonOperator(folder_path='examples/quick_start/jsons/',
                               field='json_files_column',
                               pipeline=None)
generator = operator.process(df)[0]

for record, details in generator:
    print(record)
    print(details)

Compute statistics

Compute quick statistics using StatisticsOperator. StatisticsOperator includes counts of rows, missing data, duplicate rows, and others.

import pandas as pd
from tasrif.processing_pipeline.custom import StatisticsOperator

df = pd.DataFrame( [
    ['2020-02-20', 1000, 1800, 1], ['2020-02-21', 5000, 2100, 1], ['2020-02-22', 10000, 2400, 1],
    ['2020-02-20', 1000, 1800, 1], ['2020-02-21', 5000, 2100, 1], ['2020-02-22', 10000, 2400, 1],
    ['2020-02-20', 0, 1600, 2], ['2020-02-21', 4000, 2000, 2], ['2020-02-22', 11000, 2400, 2],
    ['2020-02-20', None, 2000, 3], ['2020-02-21', 0, 2700, 3], ['2020-02-22', 15000, 3100, 3]],
columns=['Day', 'Steps', 'Calories', 'PersonId'])

filter_features = {
    'Steps': lambda x : x > 0
}

sop = StatisticsOperator(participant_identifier='PersonId',
                         date_feature_name='Day',
                         filter_features=filter_features)
sop.process(df)[0]

Or use ParticipationOverviewOperator to see statistics per participant. Pass the argument overview_type="date_vs_features" to compute statistics per date. See below

from tasrif.processing_pipeline.custom import ParticipationOverviewOperator

sop = ParticipationOverviewOperator(participant_identifier='PersonId',
                                    date_feature_name='Day',
                                    overview_type='participant_vs_features')
sop.process(df)[0]

Use AggregateOperator if you require specific statistics for some columns

from tasrif.processing_pipeline.custom import AggregateOperator

operator = AggregateOperator(groupby_feature_names ="PersonId",
                            aggregation_definition= {"Steps": ["mean", "std"],
                                                     "Calories": ["sum"]
                                                    })
operator.process(df)[0]

Extract features from existing columns

Convert time columns into cyclical features, which are more efficiently grasped by machine learning models

from tasrif.processing_pipeline.custom import EncodeCyclicalFeaturesOperator
from tasrif.processing_pipeline.pandas import ReadCsvOperator

df = ReadCsvOperator('examples/quick_start/steps_per_day.csv',
                     parse_dates=['Date']).process()[0]

operator = EncodeCyclicalFeaturesOperator(date_feature_name="Date",
                                          category_definition="day")
operator.process(df)[0]

Extract timeseries features using CalculateTimeseriesPropertiesOperator which internally calls kats package

from tasrif.processing_pipeline.kats import CalculateTimeseriesPropertiesOperator
from tasrif.processing_pipeline.pandas import ReadCsvOperator

df = ReadCsvOperator('examples/quick_start/long_ts.csv',
                     parse_dates=['Date']).process()[0]


operator = CalculateTimeseriesPropertiesOperator(date_feature_name="Date", value_column='Steps')
operator.process(df)[0]

Extract using features using tsfresh package

from tasrif.processing_pipeline.custom import SlidingWindowOperator
from tasrif.processing_pipeline.pandas import ReadCsvOperator
from tasrif.processing_pipeline.tsfresh import TSFreshFeatureExtractorOperator

df = ReadCsvOperator('examples/quick_start/cgm.csv',
                     parse_dates=['dateTime']).process()[0]


op = SlidingWindowOperator(winsize="1h15t",
                           time_col="dateTime",
                           label_col="CGM",
                           participant_identifier="patientID")

df_timeseries, df_labels, df_label_time, df_pids = op.process(df)[0]

op = TSFreshFeatureExtractorOperator(seq_id_col="seq_id", date_feature_name='dateTime', value_col='CGM')
features = op.process(df_timeseries)[0]
features.dropna(axis=1)

Note that TSFreshFeatureExtractorOperator requires a column seq_id. This column indicates which entities the time series belong to. Features will be extracted individually for each entity (id). The resulting feature matrix will contain one row per id. The column can be created manually or be created via SlidingWindowOperator.

Filter data

filter rows, days, or participants with a custom condition using FilterOperator

from tasrif.processing_pipeline.pandas import ReadCsvOperator
from tasrif.processing_pipeline.custom import FilterOperator

df = ReadCsvOperator('examples/quick_start/filter_example.csv',
                     parse_dates=['Hours']).process()[0]

operator = FilterOperator(participant_identifier="Id",
                          date_feature_name="Hours",
                          epoch_filter=lambda df: df['Steps'] > 10,
                          day_filter={
                              "column": "Hours",
                              "filter": lambda x: x.count() < 10,
                              "consecutive_days": (7, 12) # 7 minimum consecutive days, and 12 max
                          },
                          filter_type="include")

operator.process(df)[0]

Wrangle data

Add a column using CreateFeatureOperator

import pandas as pd
from pandas import Timestamp

df = pd.DataFrame([
 [Timestamp('2016-12-31 00:00:00'), Timestamp('2017-01-01 09:03:00'), 5470, 2968, 1],
 [Timestamp('2017-01-01 00:00:00'), Timestamp('2017-01-01 23:44:00'), 9769, 2073, 1],
 [Timestamp('2017-01-02 00:00:00'), Timestamp('2017-01-02 16:54:00'), 9444, 2883, 1],
 [Timestamp('2017-01-03 00:00:00'), Timestamp('2017-01-05 22:49:00'), 20064, 2287, 1],
 [Timestamp('2017-01-04 00:00:00'), Timestamp('2017-01-06 07:27:00'),16771, 2716, 1]],
    columns = ['startTime', 'endTime', 'steps', 'calories', 'personId']
)

operator = CreateFeatureOperator(
   feature_name="duration",
   feature_creator=lambda df: df['endTime'] - df['startTime'])

operator.process(df)[0]

Upsample or downsample date features using ResampleOperator. The first argument rule can be minutes min, hours H, days D, and more. See details of resampling here

from tasrif.processing_pipeline.pandas import ReadCsvOperator
from tasrif.processing_pipeline.custom import ResampleOperator

df = ReadCsvOperator('examples/quick_start/sleep.csv',
                     parse_dates=['timestamp'],
                     index_col=['timestamp']).process()[0]

op = ResampleOperator('D', {'sleep_level': 'mean'})
op.process(df)

Note that, currently, the index of the dataframe has to be of type DatetimeIndex so that ResampleOperator can be called correctly. Set the start hour of the day to some hour using SetStartHourOfDayOperator

from tasrif.processing_pipeline.pandas import ReadCsvOperator
from tasrif.processing_pipeline.custom import SetStartHourOfDayOperator

df = ReadCsvOperator('examples/quick_start/filter_example.csv',
                     parse_dates=['Hours']).process()[0]

operator = SetStartHourOfDayOperator(date_feature_name='Hours',
                                     participant_identifier='Id',
                                     shift=6)
operator.process(df)[0]

a new column shifted_time_col will be created. This can be useful if the user wants to calculate statistics at a redefined times of the day instead of midnight-to-midnight (e.g. 8:00 AM - 8:00 AM).

Concatenate multiple dataframes or a generator using ConcatOperator

import pandas as pd
from tasrif.processing_pipeline.pandas import ConcatOperator

df = pd.DataFrame([
 [Timestamp('2016-12-31 00:00:00'), Timestamp('2017-01-01 09:03:00'), 5470, 2968, 1],
 [Timestamp('2017-01-01 00:00:00'), Timestamp('2017-01-01 23:44:00'), 9769, 2073, 1],
 [Timestamp('2017-01-02 00:00:00'), Timestamp('2017-01-02 16:54:00'), 9444, 2883, 1],
 [Timestamp('2017-01-03 00:00:00'), Timestamp('2017-01-05 22:49:00'), 20064, 2287, 1],
 [Timestamp('2017-01-04 00:00:00'), Timestamp('2017-01-06 07:27:00'),16771, 2716, 1]],
    columns = ['startTime', 'endTime', 'steps', 'calories', 'personId']
)

df1 = df.copy()
df2 = df.copy()

concatenated_df = ConcatOperator().process(df1, df2)[0]

Normalize selected columns

import pandas as pd
from tasrif.processing_pipeline.custom import NormalizeOperator
from tasrif.processing_pipeline.custom import NormalizeTransformOperator

df = pd.DataFrame([
    [1, "2020-05-01 00:00:00", 10],
    [1, "2020-05-01 01:00:00", 15],
    [1, "2020-05-01 03:00:00", 23],
    [2, "2020-05-02 00:00:00", 17],
    [2, "2020-05-02 01:00:00", 11]],
    columns=['logId', 'timestamp', 'sleep_level'])

op = NormalizeOperator('all', 'minmax', {'feature_range': (0, 2)})
output = op.process(df)

Use the fit normalizer on different data using NormalizeTransformOperator

trained_model = output[0][1]

op = NormalizeTransformOperator('all', trained_model)

output = op.process(df)
output

Use AggregateActivityDatesOperator to view the start date and end date of a dataframe that has a date column per row per participant.

import pandas as pd

from tasrif.processing_pipeline.custom import AggregateActivityDatesOperator
from tasrif.processing_pipeline.pandas import ReadCsvOperator

reader = ReadCsvOperator('examples/quick_start/activity_long.csv')
df = reader.process()[0]

operator = AggregateActivityDatesOperator(date_feature_name="date",
                                        participant_identifier=['Id', 'logId'])
df = operator.process(df)[0]
df

You can use jqOperator to process JSON data

import pandas as pd
from tasrif.processing_pipeline.custom import JqOperator
df = [
  {
    "date": "2020-01-01",
    "sleep": [
      {
        "sleep_data": [
          {
            "level": "rem",
            "minutes": 180
          },
          {
            "level": "deep",
            "minutes": 80
          },
          {
            "level": "light",
            "minutes": 300
          }
        ]
      }
    ]
  },
  {
    "date": "2020-01-02",
    "sleep": [
      {
        "sleep_data": [
          {
            "level": "rem",
            "minutes": 280
          },
          {
            "level": "deep",
            "minutes": 60
          },
          {
            "level": "light",
            "minutes": 200
          }
        ]
      }
    ]
  }
]



op = JqOperator("map({date, sleep: .sleep[].sleep_data})")

op.process(df)

Test prepared data

See if your prepared data can act as an input to a machine learning model

from tasrif.processing_pipeline.custom import LinearFitOperator
df = pd.DataFrame([
    [1, "2020-05-01 00:00:00", 10, 'poor'],
    [1, "2020-05-01 01:00:00", 15, 'poor'],
    [1, "2020-05-01 03:00:00", 23, 'good'],
    [2, "2020-05-02 00:00:00", 17, 'good'],
    [2, "2020-05-02 01:00:00", 11, 'poor']],
    columns=['logId', 'timestamp', 'sleep_level', 'sleep_quality'])

op = LinearFitOperator(feature_names='sleep_level',
                       target='sleep_quality',
                       target_type='categorical')
op.process(df)

Create a pipeline to link the operators

Chain operators using SequenceOperator

import pandas as pd
from tasrif.processing_pipeline import SequenceOperator
from tasrif.processing_pipeline.custom import AggregateOperator, CreateFeatureOperator, SetStartHourOfDayOperator
from tasrif.processing_pipeline.pandas import ConvertToDatetimeOperator, SortOperator, ReadCsvOperator

df = ReadCsvOperator('examples/quick_start/cgm.csv').process()[0]

df

pipeline = SequenceOperator([
    ConvertToDatetimeOperator(feature_names=["dateTime"]),
    SetStartHourOfDayOperator(date_feature_name='dateTime',
                                     participant_identifier='patientID',
                                     shift=6),
    SortOperator(by='dateTime'),
    AggregateOperator(groupby_feature_names ="patientID",
                      aggregation_definition= {"CGM": ["mean", "std"]})

])

pipeline.process(df)

Debug your pipeline

Tasrif contains observers under tasrif/processing_pipeline/observers/ that are useful for seeing how the operators change your data. For instance, you can print the head of processed dataframe after every operator. You can do so by passing an observer to the observers argument in SequenceOperator.

import pandas as pd
from tasrif.processing_pipeline.pandas import RenameOperator
from tasrif.processing_pipeline.observers import FunctionalObserver, LoggingObserver, GroupbyLoggingObserver
from tasrif.processing_pipeline import SequenceOperator, Observer

df = pd.DataFrame([
    [1, "2020-05-01 00:00:00", 1],
    [1, "2020-05-01 01:00:00", 1],
    [1, "2020-05-01 03:00:00", 2],
    [2, "2020-05-02 00:00:00", 1],
    [2, "2020-05-02 01:00:00", 1]],
    columns=['logId', 'timestamp', 'sleep_level'])

pipeline = SequenceOperator([RenameOperator(columns={"timestamp": "time"}),
                             RenameOperator(columns={"time": "time_difference"})],
                             observers=[LoggingObserver("head,tail")])
result = pipeline.process(df[0])
result

Define a custom operator

Users can inherit from MapProcessingOperator to quickly build their own custom operators that perform map-like operations.

from tasrif.processing_pipeline.map_processing_operator import MapProcessingOperator

class SizeOperator(MapProcessingOperator):
    def _processing_function(self, df):
        return df.size

Other references

  • You may examine tasrif/processing_pipeline/test_scripts/ for other minimal examples of Tasrif's operators.
  • Common Pandas functions can be found under tasrif/processing_pipeline/pandas/

Documentation

Tasrif's official documentation is hosted here: https://tasrif.qcri.org

You can build the docs locally after installing the dependencies in setup.py and requirements.txt by:

cd docs
make html

You can then browse through them by opening docs/build/html/index.html in a browser.

Contributing

Contributors

This project is much stronger with your collaboration. Be part of it!
Thank you all amazing contributors!