From 1fa55cae9f2c60bddd178a881ab3a2ca87a544f9 Mon Sep 17 00:00:00 2001 From: Particular Miner <78448465+ParticularMiner@users.noreply.github.com> Date: Mon, 25 Oct 2021 16:38:14 +0200 Subject: [PATCH] removed all unnecessary calls to compute() --- .gitignore | 2 - dask_ml/feature_extraction/text.py | 90 ++++++++++++++++-------------- 2 files changed, 47 insertions(+), 45 deletions(-) diff --git a/.gitignore b/.gitignore index b7707ff9a..4c9132fad 100644 --- a/.gitignore +++ b/.gitignore @@ -122,5 +122,3 @@ docs/source/auto_examples/ docs/source/examples/mydask.png dask-worker-space -/.project -/.pydevproject diff --git a/dask_ml/feature_extraction/text.py b/dask_ml/feature_extraction/text.py index 169d28d43..43827db0b 100644 --- a/dask_ml/feature_extraction/text.py +++ b/dask_ml/feature_extraction/text.py @@ -120,6 +120,18 @@ def _hasher(self): return sklearn.feature_extraction.text.FeatureHasher +def _n_samples(X): + """Count the number of samples in dask.array.Array X.""" + def chunk_n_samples(chunk, axis, keepdims): + return np.array([chunk.shape[0]], dtype=np.int64) + + return da.reduction(X, + chunk=chunk_n_samples, + aggregate=np.sum, + concatenate=False, + dtype=np.int64) + + def _document_frequency(X, dtype): """Count the number of non-zero values for each feature in dask array X.""" def chunk_doc_freq(chunk, axis, keepdims): @@ -133,7 +145,7 @@ def chunk_doc_freq(chunk, axis, keepdims): aggregate=np.sum, axis=0, concatenate=False, - dtype=dtype).compute().astype(dtype) + dtype=dtype) class CountVectorizer(sklearn.feature_extraction.text.CountVectorizer): @@ -203,17 +215,19 @@ class CountVectorizer(sklearn.feature_extraction.text.CountVectorizer): ['and', 'document', 'first', 'is', 'one', 'second', 'the', 'third', 'this'] """ - def fit_transform(self, raw_documents, y=None): + def get_params(self): # Note that in general 'self' could refer to an instance of either this # class or a subclass of this class. Hence it is possible that # self.get_params() could get unexpected parameters of an instance of a # subclass. Such parameters need to be excluded here: - subclass_instance_params = self.get_params() + subclass_instance_params = super().get_params() excluded_keys = getattr(self, '_non_CountVectorizer_params', []) - params = {key: subclass_instance_params[key] - for key in subclass_instance_params - if key not in excluded_keys} + return {key: subclass_instance_params[key] + for key in subclass_instance_params + if key not in excluded_keys} + def fit_transform(self, raw_documents, y=None): + params = self.get_params() vocabulary = params.pop("vocabulary") vocabulary_for_transform = vocabulary @@ -227,12 +241,12 @@ def fit_transform(self, raw_documents, y=None): # Case 2: learn vocabulary from the data. vocabularies = raw_documents.map_partitions(_build_vocabulary, params) vocabulary = vocabulary_for_transform = ( - _merge_vocabulary( *vocabularies.to_delayed() )) + _merge_vocabulary(*vocabularies.to_delayed())) vocabulary_for_transform = vocabulary_for_transform.persist() vocabulary_ = vocabulary.compute() n_features = len(vocabulary_) - meta = scipy.sparse.eye(0, format="csr", dtype=self.dtype) + meta = scipy.sparse.csr_matrix((0, n_features), dtype=self.dtype) if isinstance(raw_documents, dd.Series): result = raw_documents.map_partitions( _count_vectorizer_transform, vocabulary_for_transform, @@ -241,7 +255,6 @@ def fit_transform(self, raw_documents, y=None): result = raw_documents.map_partitions( _count_vectorizer_transform, vocabulary_for_transform, params) result = build_array(result, n_features, meta) - result.compute_chunk_sizes() self.vocabulary_ = vocabulary_ self.fixed_vocabulary_ = fixed_vocabulary @@ -249,15 +262,7 @@ def fit_transform(self, raw_documents, y=None): return result def transform(self, raw_documents): - # Note that in general 'self' could refer to an instance of either this - # class or a subclass of this class. Hence it is possible that - # self.get_params() could get unexpected parameters of an instance of a - # subclass. Such parameters need to be excluded here: - subclass_instance_params = self.get_params() - excluded_keys = getattr(self, '_non_CountVectorizer_params', []) - params = {key: subclass_instance_params[key] - for key in subclass_instance_params - if key not in excluded_keys} + params = self.get_params() vocabulary = params.pop("vocabulary") if vocabulary is None: @@ -271,14 +276,13 @@ def transform(self, raw_documents): except ValueError: vocabulary_for_transform = dask.delayed(vocabulary) else: - (vocabulary_for_transform,) = client.scatter( - (vocabulary,), broadcast=True - ) + (vocabulary_for_transform,) = client.scatter((vocabulary,), + broadcast=True) else: vocabulary_for_transform = vocabulary n_features = vocabulary_length(vocabulary_for_transform) - meta = scipy.sparse.eye(0, format="csr", dtype=self.dtype) + meta = scipy.sparse.csr_matrix((0, n_features), dtype=self.dtype) if isinstance(raw_documents, dd.Series): result = raw_documents.map_partitions( _count_vectorizer_transform, vocabulary_for_transform, @@ -287,7 +291,6 @@ def transform(self, raw_documents): transformed = raw_documents.map_partitions( _count_vectorizer_transform, vocabulary_for_transform, params) result = build_array(transformed, n_features, meta) - result.compute_chunk_sizes() return result class TfidfTransformer(sklearn.feature_extraction.text.TfidfTransformer): @@ -331,15 +334,10 @@ def fit(self, X, y=None): X : sparse matrix of shape n_samples, n_features) A matrix of term/token counts. """ - # X = check_array(X, accept_sparse=('csr', 'csc')) - # if not sp.issparse(X): - # X = sp.csr_matrix(X) - dtype = X.dtype if X.dtype in FLOAT_DTYPES else np.float64 - - if self.use_idf: - n_samples, n_features = X.shape + def get_idf_diag(X, dtype): + n_samples = _n_samples(X) # X.shape[0] is not yet known + n_features = X.shape[1] df = _document_frequency(X, dtype) - # df = df.astype(dtype, **_astype_copy_false(df)) # perform idf smoothing if required df += int(self.smooth_idf) @@ -347,14 +345,12 @@ def fit(self, X, y=None): # log+1 instead of log makes sure terms with zero idf don't get # suppressed entirely. - idf = np.log(n_samples / df) + 1 - self._idf_diag = scipy.sparse.diags( - idf, - offsets=0, - shape=(n_features, n_features), - format="csr", - dtype=dtype, - ) + return np.log(n_samples / df) + 1 + + dtype = X.dtype if X.dtype in FLOAT_DTYPES else np.float64 + + if self.use_idf: + self._idf_diag = get_idf_diag(X, dtype) return self @@ -404,8 +400,17 @@ def _dot_idf_diag(chunk): # idf_ being a property, the automatic attributes detection # does not work as usual and we need to specify the attribute # name: - check_is_fitted(self, attributes=["idf_"], msg="idf vector is not fitted") - + check_is_fitted(self, attributes=["idf_"], + msg="idf vector is not fitted") + if dask.is_dask_collection(self._idf_diag): + _idf_diag = self._idf_diag.compute() + n_features = len(_idf_diag) + self._idf_diag = scipy.sparse.diags( + _idf_diag, + offsets=0, + shape=(n_features, n_features), + format="csr", + dtype=_idf_diag.dtype) X = X.map_blocks(_dot_idf_diag, dtype=np.float64, meta=meta) if self.norm: @@ -619,8 +624,7 @@ def fit(self, raw_documents, y=None): """ self._check_params() self._warn_for_unused_params() - X = super().fit_transform(raw_documents, - y=self._non_CountVectorizer_params) + X = super().fit_transform(raw_documents) self._tfidf.fit(X) return self