Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion openml/_api_calls.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
import time
import hashlib
import logging
import pathlib
import requests
import urllib.parse
import xml
import xmltodict
from typing import Dict, Optional
from typing import Dict, Optional, Union

import minio

from . import config
from .exceptions import (
Expand Down Expand Up @@ -68,6 +72,45 @@ def _perform_api_call(call, request_method, data=None, file_elements=None):
return response.text


def _download_minio_file(
source: str, destination: Union[str, pathlib.Path], exists_ok: bool = True,
) -> None:
""" Download file ``source`` from a MinIO Bucket and store it at ``destination``.

Parameters
----------
source : Union[str, pathlib.Path]
URL to a file in a MinIO bucket.
destination : str
Path to store the file to, if a directory is provided the original filename is used.
exists_ok : bool, optional (default=True)
If False, raise FileExists if a file already exists in ``destination``.

"""
destination = pathlib.Path(destination)
parsed_url = urllib.parse.urlparse(source)

# expect path format: /BUCKET/path/to/file.ext
bucket, object_name = parsed_url.path[1:].split("/", maxsplit=1)
if destination.is_dir():
destination = pathlib.Path(destination, object_name)
if destination.is_file() and not exists_ok:
raise FileExistsError(f"File already exists in {destination}.")

client = minio.Minio(endpoint=parsed_url.netloc, secure=False)

try:
client.fget_object(
bucket_name=bucket, object_name=object_name, file_path=str(destination),
)
except minio.error.S3Error as e:
if e.message.startswith("Object does not exist"):
raise FileNotFoundError(f"Object at '{source}' does not exist.") from e
# e.g. permission error, or a bucket does not exist (which is also interpreted as a
# permission error on minio level).
raise FileNotFoundError("Bucket does not exist or is private.") from e


def _download_text_file(
source: str,
output_path: Optional[str] = None,
Expand Down
58 changes: 44 additions & 14 deletions openml/datasets/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ class OpenMLDataset(OpenMLBase):
which maps a quality name to a quality value.
dataset: string, optional
Serialized arff dataset string.
minio_url: string, optional
URL to the MinIO bucket with dataset files
parquet_file: string, optional
Path to the local parquet file.
"""

def __init__(
Expand Down Expand Up @@ -128,6 +132,8 @@ def __init__(
features_file: Optional[str] = None,
qualities_file: Optional[str] = None,
dataset=None,
minio_url: Optional[str] = None,
parquet_file: Optional[str] = None,
):
def find_invalid_characters(string, pattern):
invalid_chars = set()
Expand Down Expand Up @@ -202,7 +208,9 @@ def find_invalid_characters(string, pattern):
self.update_comment = update_comment
self.md5_checksum = md5_checksum
self.data_file = data_file
self.parquet_file = parquet_file
self._dataset = dataset
self._minio_url = minio_url

if features_file is not None:
self.features = _read_features(
Expand Down Expand Up @@ -291,9 +299,11 @@ def __eq__(self, other):
def _download_data(self) -> None:
""" Download ARFF data file to standard cache directory. Set `self.data_file`. """
# import required here to avoid circular import.
from .functions import _get_dataset_arff
from .functions import _get_dataset_arff, _get_dataset_parquet

self.data_file = _get_dataset_arff(self)
if self._minio_url is not None:
self.parquet_file = _get_dataset_parquet(self)

def _get_arff(self, format: str) -> Dict:
"""Read ARFF file and return decoded arff.
Expand Down Expand Up @@ -454,22 +464,38 @@ def _parse_data_from_arff(
return X, categorical, attribute_names

def _compressed_cache_file_paths(self, data_file: str) -> Tuple[str, str, str]:
data_pickle_file = data_file.replace(".arff", ".pkl.py3")
data_feather_file = data_file.replace(".arff", ".feather")
feather_attribute_file = data_file.replace(".arff", ".feather.attributes.pkl.py3")
ext = f".{data_file.split('.')[-1]}"
data_pickle_file = data_file.replace(ext, ".pkl.py3")
data_feather_file = data_file.replace(ext, ".feather")
feather_attribute_file = data_file.replace(ext, ".feather.attributes.pkl.py3")
return data_pickle_file, data_feather_file, feather_attribute_file

def _cache_compressed_file_from_arff(
self, arff_file: str
def _cache_compressed_file_from_file(
self, data_file: str
) -> Tuple[Union[pd.DataFrame, scipy.sparse.csr_matrix], List[bool], List[str]]:
""" Store data from the arff file in compressed format. Sets cache_format to 'pickle' if data is sparse. """ # noqa: 501
""" Store data from the local file in compressed format.

If a local parquet file is present it will be used instead of the arff file.
Sets cache_format to 'pickle' if data is sparse.
"""
(
data_pickle_file,
data_feather_file,
feather_attribute_file,
) = self._compressed_cache_file_paths(arff_file)
) = self._compressed_cache_file_paths(data_file)

data, categorical, attribute_names = self._parse_data_from_arff(arff_file)
if data_file.endswith(".arff"):
data, categorical, attribute_names = self._parse_data_from_arff(data_file)
elif data_file.endswith(".pq"):
try:
data = pd.read_parquet(data_file)
except Exception as e:
raise Exception(f"File: {data_file}") from e

categorical = [data[c].dtype.name == "category" for c in data.columns]
attribute_names = list(data.columns)
else:
raise ValueError(f"Unknown file type for file '{data_file}'.")

# Feather format does not work for sparse datasets, so we use pickle for sparse datasets
if scipy.sparse.issparse(data):
Expand All @@ -480,12 +506,16 @@ def _cache_compressed_file_from_arff(
data.to_feather(data_feather_file)
with open(feather_attribute_file, "wb") as fh:
pickle.dump((categorical, attribute_names), fh, pickle.HIGHEST_PROTOCOL)
self.data_feather_file = data_feather_file
self.feather_attribute_file = feather_attribute_file
else:
with open(data_pickle_file, "wb") as fh:
pickle.dump((data, categorical, attribute_names), fh, pickle.HIGHEST_PROTOCOL)
self.data_pickle_file = data_pickle_file

data_file = data_pickle_file if self.cache_format == "pickle" else data_feather_file
logger.debug(f"Saved dataset {int(self.dataset_id or -1)}: {self.name} to file {data_file}")

return data, categorical, attribute_names

def _load_data(self):
Expand All @@ -496,10 +526,9 @@ def _load_data(self):
if need_to_create_pickle or need_to_create_feather:
if self.data_file is None:
self._download_data()
res = self._compressed_cache_file_paths(self.data_file)
self.data_pickle_file, self.data_feather_file, self.feather_attribute_file = res
# Since our recently stored data is exists in memory, there is no need to load from disk
return self._cache_compressed_file_from_arff(self.data_file)

file_to_load = self.data_file if self.parquet_file is None else self.parquet_file
return self._cache_compressed_file_from_file(file_to_load)

# helper variable to help identify where errors occur
fpath = self.data_feather_file if self.cache_format == "feather" else self.data_pickle_file
Expand Down Expand Up @@ -543,7 +572,8 @@ def _load_data(self):
data_up_to_date = isinstance(data, pd.DataFrame) or scipy.sparse.issparse(data)
if self.cache_format == "pickle" and not data_up_to_date:
logger.info("Updating outdated pickle file.")
return self._cache_compressed_file_from_arff(self.data_file)
file_to_load = self.data_file if self.parquet_file is None else self.parquet_file
return self._cache_compressed_file_from_file(file_to_load)
return data, categorical, attribute_names

@staticmethod
Expand Down
62 changes: 60 additions & 2 deletions openml/datasets/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io
import logging
import os
from typing import List, Dict, Union, Optional
from typing import List, Dict, Union, Optional, cast

import numpy as np
import arff
Expand Down Expand Up @@ -424,6 +424,10 @@ def get_dataset(
raise

arff_file = _get_dataset_arff(description) if download_data else None
if "oml:minio_url" in description and download_data:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain why we potentially download both files? I guess this is easier to handle at the moment?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason is that the test server currently always returns a minio_url entry, regardless of whether or not the parquet file actually exists. I suppose you could turn it around and only download the arff file of the parquet file needed to download.
Some people also might still be interested in having the arff file (for now), and we would not have a public API for downloading that file, so in the interest of merging this before a PyPI release, I figured keeping that behavior for now is better.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Do you think we should open an issue to track the move from arff to parquet?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a good idea: #1032.

parquet_file = _get_dataset_parquet(description)
else:
parquet_file = None
remove_dataset_cache = False
except OpenMLServerException as e:
# if there was an exception,
Expand All @@ -437,7 +441,7 @@ def get_dataset(
_remove_cache_dir_for_id(DATASETS_CACHE_DIR_NAME, did_cache_dir)

dataset = _create_dataset_from_description(
description, features_file, qualities_file, arff_file, cache_format
description, features_file, qualities_file, arff_file, parquet_file, cache_format
)
return dataset

Expand Down Expand Up @@ -908,6 +912,55 @@ def _get_dataset_description(did_cache_dir, dataset_id):
return description


def _get_dataset_parquet(
description: Union[Dict, OpenMLDataset], cache_directory: str = None
) -> Optional[str]:
""" Return the path to the local parquet file of the dataset. If is not cached, it is downloaded.

Checks if the file is in the cache, if yes, return the path to the file.
If not, downloads the file and caches it, then returns the file path.
The cache directory is generated based on dataset information, but can also be specified.

This function is NOT thread/multiprocessing safe.
Unlike the ARFF equivalent, checksums are not available/used (for now).

Parameters
----------
description : dictionary or OpenMLDataset
Either a dataset description as dict or OpenMLDataset.

cache_directory: str, optional (default=None)
Folder to store the parquet file in.
If None, use the default cache directory for the dataset.

Returns
-------
output_filename : string, optional
Location of the Parquet file if successfully downloaded, None otherwise.
"""
if isinstance(description, dict):
url = description.get("oml:minio_url")
did = description.get("oml:id")
elif isinstance(description, OpenMLDataset):
url = description._minio_url
did = description.dataset_id
else:
raise TypeError("`description` should be either OpenMLDataset or Dict.")

if cache_directory is None:
cache_directory = _create_cache_directory_for_id(DATASETS_CACHE_DIR_NAME, did)
output_file_path = os.path.join(cache_directory, "dataset.pq")

if not os.path.isfile(output_file_path):
try:
openml._api_calls._download_minio_file(
source=cast(str, url), destination=output_file_path
)
except FileNotFoundError:
return None
return output_file_path


def _get_dataset_arff(description: Union[Dict, OpenMLDataset], cache_directory: str = None) -> str:
""" Return the path to the local arff file of the dataset. If is not cached, it is downloaded.

Expand Down Expand Up @@ -1031,6 +1084,7 @@ def _create_dataset_from_description(
features_file: str,
qualities_file: str,
arff_file: str = None,
parquet_file: str = None,
cache_format: str = "pickle",
) -> OpenMLDataset:
"""Create a dataset object from a description dict.
Expand All @@ -1045,6 +1099,8 @@ def _create_dataset_from_description(
Path of the dataset qualities as xml file.
arff_file : string, optional
Path of dataset ARFF file.
parquet_file : string, optional
Path of dataset Parquet file.
cache_format: string, optional
Caching option for datasets (feather/pickle)

Expand Down Expand Up @@ -1081,6 +1137,8 @@ def _create_dataset_from_description(
cache_format=cache_format,
features_file=features_file,
qualities_file=qualities_file,
minio_url=description.get("oml:minio_url"),
parquet_file=parquet_file,
)


Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
"pandas>=1.0.0",
"scipy>=0.13.3",
"numpy>=1.6.2",
"minio",
"pyarrow",
],
extras_require={
"test": [
Expand All @@ -65,7 +67,6 @@
"nbformat",
"oslo.concurrency",
"flaky",
"pyarrow",
"pre-commit",
"pytest-cov",
"mypy",
Expand Down
Binary file not shown.
Loading