Skip to content

Commit

Permalink
Reduce the number of warnings emitted (#1947)
Browse files Browse the repository at this point in the history
* Code updates to avoid several future & deprecation warnings
* Most of these are the result of recent package updates, specifically from `pydantic` and `langchain`
* Mark slow `MultiProcessingStage` tests as slow
* Number of reported warnings produced when running `pytest` from 48 to 10
* Consolidate redundant fixtures
* Increase CI timelimit for tests to 90 minutes

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1947
  • Loading branch information
dagardner-nv authored Oct 16, 2024
1 parent 34d2d48 commit 5692639
Show file tree
Hide file tree
Showing 29 changed files with 123 additions and 101 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci_pipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ jobs:
test:
name: Test
runs-on: linux-amd64-gpu-v100-latest-1
timeout-minutes: 60
# Consider lowering this back down to 60 minutes per https://github.com/nv-morpheus/Morpheus/issues/1948
timeout-minutes: 90
container:
credentials:
username: '$oauthtoken'
Expand Down
2 changes: 1 addition & 1 deletion examples/gnn_fraud_detection_pipeline/stages/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ def load_model(model_dir: str,
n_layers=hyperparameters['n_layers'],
embedding_size=hyperparameters['embedding_size'],
target=hyperparameters['target_node']).to(device)
model.load_state_dict(torch.load(os.path.join(model_dir, 'model.pt')))
model.load_state_dict(torch.load(os.path.join(model_dir, 'model.pt'), weights_only=False))

return model, graph, hyperparameters

Expand Down
2 changes: 1 addition & 1 deletion examples/llm/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import logging

import pymilvus
from langchain.embeddings import HuggingFaceEmbeddings # pylint: disable=no-name-in-module
from langchain_community.embeddings import HuggingFaceEmbeddings

from morpheus_llm.llm.services.llm_service import LLMService
from morpheus_llm.llm.services.nemo_llm_service import NeMoLLMService
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/vdb_upload/langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

# pylint: disable=no-name-in-module
from langchain.document_loaders.rss import RSSFeedLoader
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.milvus import Milvus
from langchain_community.embeddings import HuggingFaceEmbeddings

from examples.llm.vdb_upload.vdb_utils import DEFAULT_RSS_URLS
from morpheus.utils.logging_timer import log_time
Expand Down
11 changes: 4 additions & 7 deletions examples/llm/vdb_upload/module/content_extractor_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
import pypdfium2 as libpdfium
from docx import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from pydantic import BaseModel # pylint: disable=no-name-in-module
from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Field
from pydantic import ValidationError
from pydantic import field_validator
Expand All @@ -43,9 +44,7 @@ class CSVConverterSchema(BaseModel):
chunk_overlap: int = 102 # Example default value
chunk_size: int = 1024
text_column_names: List[str]

class Config:
extra = "forbid"
model_config = ConfigDict(extra='forbid')


class ContentExtractorSchema(BaseModel):
Expand All @@ -54,6 +53,7 @@ class ContentExtractorSchema(BaseModel):
chunk_size: int = 512
converters_meta: Dict[str, Dict] = Field(default_factory=dict)
num_threads: int = 10
model_config = ConfigDict(extra='forbid')

@field_validator('converters_meta', mode="before")
@classmethod
Expand All @@ -66,9 +66,6 @@ def val_converters_meta(cls, to_validate: Dict[str, Dict]) -> Dict[str, Dict]:
validated_meta[key] = value
return validated_meta

class Config:
extra = "forbid"


logger = logging.getLogger(__name__)

Expand Down
5 changes: 2 additions & 3 deletions examples/llm/vdb_upload/module/file_source_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import mrc
from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Field
from pydantic import ValidationError

Expand Down Expand Up @@ -48,9 +49,7 @@ class FileSourcePipeSchema(BaseModel):
vdb_resource_name: str
watch: bool = False # Flag to watch file changes
watch_interval: float = -5.0 # Interval to watch file changes

class Config:
extra = "forbid"
model_config = ConfigDict(extra='forbid')


FileSourcePipeLoaderFactory = ModuleLoaderFactory("file_source_pipe", "morpheus_examples_llm", FileSourcePipeSchema)
Expand Down
9 changes: 4 additions & 5 deletions examples/llm/vdb_upload/module/rss_source_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@

import mrc
from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Field
from pydantic import ValidationError
from pydantic import validator
from pydantic import field_validator

from morpheus.modules.general.monitor import MonitorLoaderFactory
from morpheus.modules.input.rss_source import RSSSourceLoaderFactory
Expand Down Expand Up @@ -52,8 +53,9 @@ class RSSSourcePipeSchema(BaseModel):
strip_markup: bool = True
vdb_resource_name: str
web_scraper_config: Optional[Dict[Any, Any]] = None
model_config = ConfigDict(extra='forbid')

@validator('feed_input', pre=True)
@field_validator('feed_input')
def validate_feed_input(cls, to_validate): # pylint: disable=no-self-argument
if isinstance(to_validate, str):
return [to_validate]
Expand All @@ -63,9 +65,6 @@ def validate_feed_input(cls, to_validate): # pylint: disable=no-self-argument

raise ValueError('feed_input must be a string or a list of strings')

class Config:
extra = "forbid"


RSSSourcePipeLoaderFactory = ModuleLoaderFactory("rss_source_pipe", "morpheus_examples_llm", RSSSourcePipeSchema)

Expand Down
9 changes: 3 additions & 6 deletions examples/llm/vdb_upload/module/schema_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import mrc
import mrc.core.operators as ops
from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Field
from pydantic import ValidationError

Expand All @@ -39,16 +40,12 @@ class ColumnTransformSchema(BaseModel):
dtype: str
op_type: str
from_: Optional[str] = Field(None, alias="from")

class Config:
extra = "forbid"
model_config = ConfigDict(extra='forbid')


class SchemaTransformSchema(BaseModel):
schema_transform_config: Dict[str, Dict[str, Any]] = Field(default_factory=dict)

class Config:
extra = "forbid"
model_config = ConfigDict(extra='forbid')


SchemaTransformLoaderFactory = ModuleLoaderFactory("schema_transform", "morpheus_examples_llm", SchemaTransformSchema)
Expand Down
5 changes: 2 additions & 3 deletions examples/llm/vdb_upload/module/vdb_resource_tagging_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import mrc
from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import ValidationError

from morpheus.messages import ControlMessage
Expand All @@ -27,9 +28,7 @@

class VDBResourceTaggingSchema(BaseModel):
vdb_resource_name: str

class Config:
extra = "forbid"
model_config = ConfigDict(extra='forbid')


VDBResourceTaggingLoaderFactory = ModuleLoaderFactory("vdb_resource_tagging",
Expand Down
7 changes: 3 additions & 4 deletions examples/llm/vdb_upload/module/web_scraper_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import requests_cache
from bs4 import BeautifulSoup
from langchain.text_splitter import RecursiveCharacterTextSplitter
from pydantic import BaseModel # pylint: disable=no-name-in-module
from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import ValidationError

import cudf
Expand All @@ -41,9 +42,7 @@ class WebScraperSchema(BaseModel):
enable_cache: bool = False
cache_path: str = "./.cache/http/RSSDownloadStage.sqlite"
cache_dir: str = "./.cache/llm/rss"

class Config:
extra = "forbid"
model_config = ConfigDict(extra='forbid')


WebScraperLoaderFactory = ModuleLoaderFactory("web_scraper", "morpheus_examples_llm", WebScraperSchema)
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ filterwarnings = [

testpaths = ["tests"]

addopts = "--benchmark-disable"
# Don't run the benchmarks by default, don't search for tests in the tests/_utils directory which will trigger false
# alarms
addopts = "--benchmark-disable --ignore=tests/_utils"

asyncio_mode = "auto"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import Optional

from pydantic import BaseModel
from pydantic import ConfigDict

logger = logging.getLogger(__name__)

Expand All @@ -30,6 +31,4 @@ class DeserializeSchema(BaseModel):
batch_size: int = 1024
max_concurrency: int = 1
should_log_timestamp: bool = True

class Config:
extra = "forbid"
model_config = ConfigDict(extra='forbid')
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import List

from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Field

logger = logging.getLogger(f"morpheus.{__name__}")
Expand All @@ -26,6 +27,4 @@ class MultiFileSourceSchema(BaseModel):
watch_dir: bool = False
watch_interval: float = 1.0
batch_size: int = 128

class Config:
extra = "forbid"
model_config = ConfigDict(extra='forbid')
5 changes: 2 additions & 3 deletions python/morpheus/morpheus/modules/schemas/rss_source_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import List

from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Field

logger = logging.getLogger(__name__)
Expand All @@ -32,6 +33,4 @@ class RSSSourceSchema(BaseModel):
interval_sec: int = 600
stop_after_rec: int = 0
strip_markup: bool = True

class Config:
extra = "forbid"
model_config = ConfigDict(extra='forbid')
2 changes: 1 addition & 1 deletion python/morpheus/morpheus/stages/input/arxiv_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def _generate_frames(self, subscription: mrc.Subscription):

def _process_pages(self, pdf_path: str):
try:
from langchain.document_loaders import PyPDFLoader
from langchain_community.document_loaders import PyPDFLoader
from pypdf.errors import PdfStreamError
except ImportError as exc:
raise ImportError(IMPORT_ERROR_MESSAGE) from exc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import time
import typing
from http import HTTPStatus
from io import StringIO

import mrc

Expand Down Expand Up @@ -166,7 +167,7 @@ def _parse_payload(self, payload: str) -> HttpParseResponse:
df = self._payload_to_df_fn(payload, self._lines)
else:
# engine='cudf' is needed when lines=False to avoid using pandas
df = cudf.read_json(payload, lines=self._lines, engine='cudf')
df = cudf.read_json(StringIO(initial_value=payload), lines=self._lines, engine='cudf')

except Exception as e:
err_msg = "Error occurred converting HTTP payload to Dataframe"
Expand Down
2 changes: 1 addition & 1 deletion python/morpheus/morpheus/utils/column_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ def _process_column(self, df: pd.DataFrame) -> pd.Series:
The processed column as a datetime Series.
"""

dt_series = pd.to_datetime(df[self.input_name], infer_datetime_format=True, utc=True)
dt_series = pd.to_datetime(df[self.input_name], utc=True)

dtype = self.get_pandas_dtype()
if dtype == 'datetime64[ns]':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
import logging

from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Field
from pydantic import validator
from pydantic import field_validator

from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE
from morpheus.utils.module_ids import WRITE_TO_VECTOR_DB
Expand All @@ -38,18 +39,16 @@ class WriteToVDBSchema(BaseModel):
service_kwargs: dict = Field(default_factory=dict)
batch_size: int = 1024
write_time_interval: float = 1.0
model_config = ConfigDict(extra='forbid')

@validator('service', pre=True)
@field_validator('service')
def validate_service(cls, to_validate): # pylint: disable=no-self-argument
if not to_validate:
raise ValueError("Service must be a service name or a serialized instance of VectorDBService")
return to_validate

@validator('default_resource_name', pre=True)
@field_validator('default_resource_name')
def validate_resource_name(cls, to_validate): # pylint: disable=no-self-argument
if not to_validate:
raise ValueError("Resource name must not be None or Empty.")
return to_validate

class Config:
extra = "forbid"
1 change: 0 additions & 1 deletion tests/_utils/test_directories.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,3 @@ def __init__(self, tests_dir) -> None:
self.tests_data_dir = os.path.join(self.tests_dir, 'tests_data')
self.mock_triton_servers_dir = os.path.join(self.tests_dir, 'mock_triton_server')
self.mock_rest_server = os.path.join(self.tests_dir, 'mock_rest_server')
print(self)
29 changes: 28 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from _utils.kafka import kafka_bootstrap_servers_fixture # noqa: F401 pylint:disable=unused-import
from _utils.kafka import kafka_consumer_fixture # noqa: F401 pylint:disable=unused-import
from _utils.kafka import kafka_topics_fixture # noqa: F401 pylint:disable=unused-import
from morpheus.utils.shared_process_pool import SharedProcessPool

# Don't let pylint complain about pytest fixtures
# pylint: disable=redefined-outer-name,unused-argument
Expand Down Expand Up @@ -1081,6 +1080,32 @@ def openai_fixture(fail_missing: bool):
yield import_or_skip("openai", reason=OPT_DEP_SKIP_REASON.format(package="openai"), fail_missing=fail_missing)


@pytest.fixture(scope='session')
def dask_distributed(fail_missing: bool):
"""
Mark tests requiring dask.distributed
"""
yield import_or_skip("dask.distributed",
reason=OPT_DEP_SKIP_REASON.format(package="dask.distributed"),
fail_missing=fail_missing)


@pytest.fixture(scope='session')
def dask_cuda(fail_missing: bool):
"""
Mark tests requiring dask_cuda
"""
yield import_or_skip("dask_cuda", reason=OPT_DEP_SKIP_REASON.format(package="dask_cuda"), fail_missing=fail_missing)


@pytest.fixture(scope='session')
def mlflow(fail_missing: bool):
"""
Mark tests requiring mlflow
"""
yield import_or_skip("mlflow", reason=OPT_DEP_SKIP_REASON.format(package="mlflow"), fail_missing=fail_missing)


@pytest.fixture(name="langchain", scope='session')
def langchain_fixture(fail_missing: bool):
"""
Expand Down Expand Up @@ -1168,6 +1193,8 @@ def mock_subscription_fixture():
# Any tests that use the SharedProcessPool should use this fixture
@pytest.fixture(scope="module")
def shared_process_pool_setup_and_teardown():
from morpheus.utils.shared_process_pool import SharedProcessPool

# Set lower CPU usage for unit test to avoid slowing down the test
os.environ["MORPHEUS_SHARED_PROCESS_POOL_CPU_USAGE"] = "0.1"

Expand Down
Loading

0 comments on commit 5692639

Please sign in to comment.