diff --git a/openml/_api_calls.py b/openml/_api_calls.py index d451ac5c8..aee67d8c6 100644 --- a/openml/_api_calls.py +++ b/openml/_api_calls.py @@ -3,10 +3,14 @@ import time import hashlib import logging +import pathlib import requests +import urllib.parse import xml import xmltodict -from typing import Dict, Optional +from typing import Dict, Optional, Union + +import minio from . import config from .exceptions import ( @@ -68,6 +72,45 @@ def _perform_api_call(call, request_method, data=None, file_elements=None): return response.text +def _download_minio_file( + source: str, destination: Union[str, pathlib.Path], exists_ok: bool = True, +) -> None: + """ Download file ``source`` from a MinIO Bucket and store it at ``destination``. + + Parameters + ---------- + source : Union[str, pathlib.Path] + URL to a file in a MinIO bucket. + destination : str + Path to store the file to, if a directory is provided the original filename is used. + exists_ok : bool, optional (default=True) + If False, raise FileExists if a file already exists in ``destination``. + + """ + destination = pathlib.Path(destination) + parsed_url = urllib.parse.urlparse(source) + + # expect path format: /BUCKET/path/to/file.ext + bucket, object_name = parsed_url.path[1:].split("/", maxsplit=1) + if destination.is_dir(): + destination = pathlib.Path(destination, object_name) + if destination.is_file() and not exists_ok: + raise FileExistsError(f"File already exists in {destination}.") + + client = minio.Minio(endpoint=parsed_url.netloc, secure=False) + + try: + client.fget_object( + bucket_name=bucket, object_name=object_name, file_path=str(destination), + ) + except minio.error.S3Error as e: + if e.message.startswith("Object does not exist"): + raise FileNotFoundError(f"Object at '{source}' does not exist.") from e + # e.g. permission error, or a bucket does not exist (which is also interpreted as a + # permission error on minio level). + raise FileNotFoundError("Bucket does not exist or is private.") from e + + def _download_text_file( source: str, output_path: Optional[str] = None, diff --git a/openml/datasets/dataset.py b/openml/datasets/dataset.py index e79bcbf4e..fd13a8e8c 100644 --- a/openml/datasets/dataset.py +++ b/openml/datasets/dataset.py @@ -96,6 +96,10 @@ class OpenMLDataset(OpenMLBase): which maps a quality name to a quality value. dataset: string, optional Serialized arff dataset string. + minio_url: string, optional + URL to the MinIO bucket with dataset files + parquet_file: string, optional + Path to the local parquet file. """ def __init__( @@ -128,6 +132,8 @@ def __init__( features_file: Optional[str] = None, qualities_file: Optional[str] = None, dataset=None, + minio_url: Optional[str] = None, + parquet_file: Optional[str] = None, ): def find_invalid_characters(string, pattern): invalid_chars = set() @@ -202,7 +208,9 @@ def find_invalid_characters(string, pattern): self.update_comment = update_comment self.md5_checksum = md5_checksum self.data_file = data_file + self.parquet_file = parquet_file self._dataset = dataset + self._minio_url = minio_url if features_file is not None: self.features = _read_features( @@ -291,9 +299,11 @@ def __eq__(self, other): def _download_data(self) -> None: """ Download ARFF data file to standard cache directory. Set `self.data_file`. """ # import required here to avoid circular import. - from .functions import _get_dataset_arff + from .functions import _get_dataset_arff, _get_dataset_parquet self.data_file = _get_dataset_arff(self) + if self._minio_url is not None: + self.parquet_file = _get_dataset_parquet(self) def _get_arff(self, format: str) -> Dict: """Read ARFF file and return decoded arff. @@ -454,22 +464,38 @@ def _parse_data_from_arff( return X, categorical, attribute_names def _compressed_cache_file_paths(self, data_file: str) -> Tuple[str, str, str]: - data_pickle_file = data_file.replace(".arff", ".pkl.py3") - data_feather_file = data_file.replace(".arff", ".feather") - feather_attribute_file = data_file.replace(".arff", ".feather.attributes.pkl.py3") + ext = f".{data_file.split('.')[-1]}" + data_pickle_file = data_file.replace(ext, ".pkl.py3") + data_feather_file = data_file.replace(ext, ".feather") + feather_attribute_file = data_file.replace(ext, ".feather.attributes.pkl.py3") return data_pickle_file, data_feather_file, feather_attribute_file - def _cache_compressed_file_from_arff( - self, arff_file: str + def _cache_compressed_file_from_file( + self, data_file: str ) -> Tuple[Union[pd.DataFrame, scipy.sparse.csr_matrix], List[bool], List[str]]: - """ Store data from the arff file in compressed format. Sets cache_format to 'pickle' if data is sparse. """ # noqa: 501 + """ Store data from the local file in compressed format. + + If a local parquet file is present it will be used instead of the arff file. + Sets cache_format to 'pickle' if data is sparse. + """ ( data_pickle_file, data_feather_file, feather_attribute_file, - ) = self._compressed_cache_file_paths(arff_file) + ) = self._compressed_cache_file_paths(data_file) - data, categorical, attribute_names = self._parse_data_from_arff(arff_file) + if data_file.endswith(".arff"): + data, categorical, attribute_names = self._parse_data_from_arff(data_file) + elif data_file.endswith(".pq"): + try: + data = pd.read_parquet(data_file) + except Exception as e: + raise Exception(f"File: {data_file}") from e + + categorical = [data[c].dtype.name == "category" for c in data.columns] + attribute_names = list(data.columns) + else: + raise ValueError(f"Unknown file type for file '{data_file}'.") # Feather format does not work for sparse datasets, so we use pickle for sparse datasets if scipy.sparse.issparse(data): @@ -480,12 +506,16 @@ def _cache_compressed_file_from_arff( data.to_feather(data_feather_file) with open(feather_attribute_file, "wb") as fh: pickle.dump((categorical, attribute_names), fh, pickle.HIGHEST_PROTOCOL) + self.data_feather_file = data_feather_file + self.feather_attribute_file = feather_attribute_file else: with open(data_pickle_file, "wb") as fh: pickle.dump((data, categorical, attribute_names), fh, pickle.HIGHEST_PROTOCOL) + self.data_pickle_file = data_pickle_file data_file = data_pickle_file if self.cache_format == "pickle" else data_feather_file logger.debug(f"Saved dataset {int(self.dataset_id or -1)}: {self.name} to file {data_file}") + return data, categorical, attribute_names def _load_data(self): @@ -496,10 +526,9 @@ def _load_data(self): if need_to_create_pickle or need_to_create_feather: if self.data_file is None: self._download_data() - res = self._compressed_cache_file_paths(self.data_file) - self.data_pickle_file, self.data_feather_file, self.feather_attribute_file = res - # Since our recently stored data is exists in memory, there is no need to load from disk - return self._cache_compressed_file_from_arff(self.data_file) + + file_to_load = self.data_file if self.parquet_file is None else self.parquet_file + return self._cache_compressed_file_from_file(file_to_load) # helper variable to help identify where errors occur fpath = self.data_feather_file if self.cache_format == "feather" else self.data_pickle_file @@ -543,7 +572,8 @@ def _load_data(self): data_up_to_date = isinstance(data, pd.DataFrame) or scipy.sparse.issparse(data) if self.cache_format == "pickle" and not data_up_to_date: logger.info("Updating outdated pickle file.") - return self._cache_compressed_file_from_arff(self.data_file) + file_to_load = self.data_file if self.parquet_file is None else self.parquet_file + return self._cache_compressed_file_from_file(file_to_load) return data, categorical, attribute_names @staticmethod diff --git a/openml/datasets/functions.py b/openml/datasets/functions.py index 3f930c2ea..a9840cc82 100644 --- a/openml/datasets/functions.py +++ b/openml/datasets/functions.py @@ -3,7 +3,7 @@ import io import logging import os -from typing import List, Dict, Union, Optional +from typing import List, Dict, Union, Optional, cast import numpy as np import arff @@ -424,6 +424,10 @@ def get_dataset( raise arff_file = _get_dataset_arff(description) if download_data else None + if "oml:minio_url" in description and download_data: + parquet_file = _get_dataset_parquet(description) + else: + parquet_file = None remove_dataset_cache = False except OpenMLServerException as e: # if there was an exception, @@ -437,7 +441,7 @@ def get_dataset( _remove_cache_dir_for_id(DATASETS_CACHE_DIR_NAME, did_cache_dir) dataset = _create_dataset_from_description( - description, features_file, qualities_file, arff_file, cache_format + description, features_file, qualities_file, arff_file, parquet_file, cache_format ) return dataset @@ -908,6 +912,55 @@ def _get_dataset_description(did_cache_dir, dataset_id): return description +def _get_dataset_parquet( + description: Union[Dict, OpenMLDataset], cache_directory: str = None +) -> Optional[str]: + """ Return the path to the local parquet file of the dataset. If is not cached, it is downloaded. + + Checks if the file is in the cache, if yes, return the path to the file. + If not, downloads the file and caches it, then returns the file path. + The cache directory is generated based on dataset information, but can also be specified. + + This function is NOT thread/multiprocessing safe. + Unlike the ARFF equivalent, checksums are not available/used (for now). + + Parameters + ---------- + description : dictionary or OpenMLDataset + Either a dataset description as dict or OpenMLDataset. + + cache_directory: str, optional (default=None) + Folder to store the parquet file in. + If None, use the default cache directory for the dataset. + + Returns + ------- + output_filename : string, optional + Location of the Parquet file if successfully downloaded, None otherwise. + """ + if isinstance(description, dict): + url = description.get("oml:minio_url") + did = description.get("oml:id") + elif isinstance(description, OpenMLDataset): + url = description._minio_url + did = description.dataset_id + else: + raise TypeError("`description` should be either OpenMLDataset or Dict.") + + if cache_directory is None: + cache_directory = _create_cache_directory_for_id(DATASETS_CACHE_DIR_NAME, did) + output_file_path = os.path.join(cache_directory, "dataset.pq") + + if not os.path.isfile(output_file_path): + try: + openml._api_calls._download_minio_file( + source=cast(str, url), destination=output_file_path + ) + except FileNotFoundError: + return None + return output_file_path + + def _get_dataset_arff(description: Union[Dict, OpenMLDataset], cache_directory: str = None) -> str: """ Return the path to the local arff file of the dataset. If is not cached, it is downloaded. @@ -1031,6 +1084,7 @@ def _create_dataset_from_description( features_file: str, qualities_file: str, arff_file: str = None, + parquet_file: str = None, cache_format: str = "pickle", ) -> OpenMLDataset: """Create a dataset object from a description dict. @@ -1045,6 +1099,8 @@ def _create_dataset_from_description( Path of the dataset qualities as xml file. arff_file : string, optional Path of dataset ARFF file. + parquet_file : string, optional + Path of dataset Parquet file. cache_format: string, optional Caching option for datasets (feather/pickle) @@ -1081,6 +1137,8 @@ def _create_dataset_from_description( cache_format=cache_format, features_file=features_file, qualities_file=qualities_file, + minio_url=description.get("oml:minio_url"), + parquet_file=parquet_file, ) diff --git a/setup.py b/setup.py index 22a77bcbc..b2ca57fdc 100644 --- a/setup.py +++ b/setup.py @@ -53,6 +53,8 @@ "pandas>=1.0.0", "scipy>=0.13.3", "numpy>=1.6.2", + "minio", + "pyarrow", ], extras_require={ "test": [ @@ -65,7 +67,6 @@ "nbformat", "oslo.concurrency", "flaky", - "pyarrow", "pre-commit", "pytest-cov", "mypy", diff --git a/tests/files/org/openml/test/datasets/30/dataset.pq b/tests/files/org/openml/test/datasets/30/dataset.pq new file mode 100644 index 000000000..b35597281 Binary files /dev/null and b/tests/files/org/openml/test/datasets/30/dataset.pq differ diff --git a/tests/test_datasets/test_dataset_functions.py b/tests/test_datasets/test_dataset_functions.py index 141535def..32f4575a5 100644 --- a/tests/test_datasets/test_dataset_functions.py +++ b/tests/test_datasets/test_dataset_functions.py @@ -1,6 +1,7 @@ # License: BSD 3-Clause import os +import pathlib import random from itertools import product from unittest import mock @@ -17,6 +18,7 @@ import openml from openml import OpenMLDataset +from openml._api_calls import _download_minio_file from openml.exceptions import ( OpenMLHashException, OpenMLPrivateDatasetError, @@ -34,6 +36,7 @@ _get_online_dataset_arff, _get_online_dataset_format, DATASETS_CACHE_DIR_NAME, + _get_dataset_parquet, ) from openml.datasets import fork_dataset, edit_dataset from openml.tasks import TaskType, create_task @@ -407,6 +410,94 @@ def test__getarff_path_dataset_arff(self): self.assertIsInstance(arff_path, str) self.assertTrue(os.path.exists(arff_path)) + def test__download_minio_file_object_does_not_exist(self): + self.assertRaisesRegex( + FileNotFoundError, + r"Object at .* does not exist", + _download_minio_file, + source="http://openml1.win.tue.nl/dataset20/i_do_not_exist.pq", + destination=self.workdir, + exists_ok=True, + ) + + def test__download_minio_file_to_directory(self): + _download_minio_file( + source="http://openml1.win.tue.nl/dataset20/dataset_20.pq", + destination=self.workdir, + exists_ok=True, + ) + self.assertTrue( + os.path.isfile(os.path.join(self.workdir, "dataset_20.pq")), + "_download_minio_file can save to a folder by copying the object name", + ) + + def test__download_minio_file_to_path(self): + file_destination = os.path.join(self.workdir, "custom.pq") + _download_minio_file( + source="http://openml1.win.tue.nl/dataset20/dataset_20.pq", + destination=file_destination, + exists_ok=True, + ) + self.assertTrue( + os.path.isfile(file_destination), + "_download_minio_file can save to a folder by copying the object name", + ) + + def test__download_minio_file_raises_FileExists_if_destination_in_use(self): + file_destination = pathlib.Path(self.workdir, "custom.pq") + file_destination.touch() + + self.assertRaises( + FileExistsError, + _download_minio_file, + source="http://openml1.win.tue.nl/dataset20/dataset_20.pq", + destination=str(file_destination), + exists_ok=False, + ) + + def test__download_minio_file_works_with_bucket_subdirectory(self): + file_destination = pathlib.Path(self.workdir, "custom.csv") + _download_minio_file( + source="http://openml1.win.tue.nl/test/subdirectory/test.csv", + destination=file_destination, + exists_ok=True, + ) + self.assertTrue( + os.path.isfile(file_destination), + "_download_minio_file can download from subdirectories", + ) + + def test__get_dataset_parquet_not_cached(self): + description = { + "oml:minio_url": "http://openml1.win.tue.nl/dataset20/dataset_20.pq", + "oml:id": "20", + } + path = _get_dataset_parquet(description, cache_directory=self.workdir) + self.assertIsInstance(path, str, "_get_dataset_parquet returns a path") + self.assertTrue(os.path.isfile(path), "_get_dataset_parquet returns path to real file") + + @mock.patch("openml._api_calls._download_minio_file") + def test__get_dataset_parquet_is_cached(self, patch): + openml.config.cache_directory = self.static_cache_dir + patch.side_effect = RuntimeError( + "_download_minio_file should not be called when loading from cache" + ) + description = { + "oml:minio_url": "http://openml1.win.tue.nl/dataset30/dataset_30.pq", + "oml:id": "30", + } + path = _get_dataset_parquet(description, cache_directory=None) + self.assertIsInstance(path, str, "_get_dataset_parquet returns a path") + self.assertTrue(os.path.isfile(path), "_get_dataset_parquet returns path to real file") + + def test__get_dataset_parquet_file_does_not_exist(self): + description = { + "oml:minio_url": "http://openml1.win.tue.nl/dataset20/does_not_exist.pq", + "oml:id": "20", + } + path = _get_dataset_parquet(description, cache_directory=self.workdir) + self.assertIsNone(path, "_get_dataset_parquet returns None if no file is found") + def test__getarff_md5_issue(self): description = { "oml:id": 5, @@ -1413,6 +1504,12 @@ def test_data_fork(self): OpenMLServerException, "Unknown dataset", fork_dataset, data_id=999999, ) + def test_get_dataset_parquet(self): + dataset = openml.datasets.get_dataset(20) + self.assertIsNotNone(dataset._minio_url) + self.assertIsNotNone(dataset.parquet_file) + self.assertTrue(os.path.isfile(dataset.parquet_file)) + @pytest.mark.parametrize( "default_target_attribute,row_id_attribute,ignore_attribute",