# -*- coding: utf-8 -*-
"""Utility classes for constructing datasets."""
import logging
import os
import pathlib
import shutil
import tarfile
import zipfile
from abc import abstractmethod
from io import BytesIO
from typing import Any, List, Mapping, Optional, TextIO, Tuple, Union
from urllib.parse import urlparse
from urllib.request import urlretrieve
import pandas as pd
import requests
from tabulate import tabulate
from ..constants import PYKEEN_DATASETS
from ..triples import TriplesFactory
from ..typing import TorchRandomHint
from ..utils import normalize_string
__all__ = [
'Dataset',
'EagerDataset',
'LazyDataset',
'PathDataset',
'RemoteDataset',
'UnpackedRemoteDataset',
'TarFileRemoteDataset',
'ZipFileRemoteDataset',
'PackedZipRemoteDataset',
'TarFileSingleDataset',
'TabbedDataset',
'SingleTabbedDataset',
]
logger = logging.getLogger(__name__)
[docs]class Dataset:
"""Contains a lazy reference to a training, testing, and validation dataset."""
#: A factory wrapping the training triples
training: TriplesFactory
#: A factory wrapping the testing triples, that share indices with the training triples
testing: TriplesFactory
#: A factory wrapping the validation triples, that share indices with the training triples
validation: TriplesFactory
#: All datasets should take care of inverse triple creation
create_inverse_triples: bool
@property
def entity_to_id(self): # noqa: D401
"""The mapping of entity labels to IDs."""
return self.training.entity_to_id
@property
def relation_to_id(self): # noqa: D401
"""The mapping of relation labels to IDs."""
return self.training.relation_to_id
@property
def num_entities(self): # noqa: D401
"""The number of entities."""
return self.training.num_entities
@property
def num_relations(self): # noqa: D401
"""The number of relations."""
return self.training.num_relations
def _summary_rows(self):
return [
(label, triples_factory.num_entities, triples_factory.num_relations, triples_factory.num_triples)
for label, triples_factory in
zip(('Training', 'Testing', 'Validation'), (self.training, self.testing, self.validation))
]
[docs] def summary_str(self, title: Optional[str] = None, end='\n') -> str:
"""Make a summary string of all of the factories."""
rows = self._summary_rows()
n_triples = sum(count for *_, count in rows)
rows.append(('Total', '-', '-', n_triples))
t = tabulate(rows, headers=['Name', 'Entities', 'Relations', 'Triples'])
return f'{title or self.__class__.__name__} (create_inverse_triples={self.create_inverse_triples})\n{t}{end}'
[docs] def summarize(self, title: Optional[str] = None, file=None) -> None:
"""Print a summary of the dataset."""
print(self.summary_str(title=title), file=file)
def __str__(self) -> str: # noqa: D105
return f'{self.__class__.__name__}(num_entities={self.num_entities}, num_relations={self.num_relations})'
[docs] @classmethod
def from_path(cls, path: str, ratios: Optional[List[float]] = None) -> 'Dataset':
"""Create a dataset from a single triples factory by splitting it in 3."""
tf = TriplesFactory.from_path(path=path)
return cls.from_tf(tf=tf, ratios=ratios)
[docs] @staticmethod
def from_tf(tf: TriplesFactory, ratios: Optional[List[float]] = None) -> 'Dataset':
"""Create a dataset from a single triples factory by splitting it in 3."""
training, testing, validation = tf.split(ratios or [0.8, 0.1, 0.1])
return EagerDataset(training=training, testing=testing, validation=validation)
[docs] @classmethod
def get_normalized_name(cls) -> str:
"""Get the normalized name of the dataset."""
return normalize_string(cls.__name__)
[docs]class EagerDataset(Dataset):
"""A dataset that has already been loaded."""
def __init__(self, training: TriplesFactory, testing: TriplesFactory, validation: TriplesFactory) -> None:
self.training = training
self.testing = testing
self.validation = validation
self.create_inverse_triples = (
training.create_inverse_triples
and testing.create_inverse_triples
and self.validation.create_inverse_triples
)
[docs]class LazyDataset(Dataset):
"""A dataset that has lazy loading."""
#: The actual instance of the training factory, which is exposed to the user through `training`
_training: Optional[TriplesFactory] = None
#: The actual instance of the testing factory, which is exposed to the user through `testing`
_testing: Optional[TriplesFactory] = None
#: The actual instance of the validation factory, which is exposed to the user through `validation`
_validation: Optional[TriplesFactory] = None
#: The directory in which the cached data is stored
cache_root: pathlib.Path
@property
def training(self) -> TriplesFactory: # noqa: D401
"""The training triples factory."""
if not self._loaded:
self._load()
return self._training
@property
def testing(self) -> TriplesFactory: # noqa: D401
"""The testing triples factory that shares indices with the training triples factory."""
if not self._loaded:
self._load()
return self._testing
@property
def validation(self) -> TriplesFactory: # noqa: D401
"""The validation triples factory that shares indices with the training triples factory."""
if not self._loaded:
self._load()
if not self._loaded_validation:
self._load_validation()
return self._validation
@property
def _loaded(self) -> bool:
return self._training is not None and self._testing is not None
@property
def _loaded_validation(self):
return self._validation is not None
def _load(self) -> None:
raise NotImplementedError
def _load_validation(self) -> None:
raise NotImplementedError
def _help_cache(self, cache_root: Optional[str]) -> pathlib.Path:
"""Get the appropriate cache root directory.
:param cache_root: If none is passed, defaults to a subfolder of the
PyKEEN home directory defined in :data:`pykeen.constants.PYKEEN_HOME`.
The subfolder is named based on the class inheriting from
:class:`pykeen.datasets.base.Dataset`.
"""
if cache_root is None:
cache_root = PYKEEN_DATASETS
cache_root = pathlib.Path(cache_root) / self.__class__.__name__.lower()
cache_root.mkdir(parents=True, exist_ok=True)
logger.debug('using cache root at %s', cache_root)
return cache_root
[docs]class PathDataset(LazyDataset):
"""Contains a lazy reference to a training, testing, and validation dataset."""
def __init__(
self,
training_path: Union[str, TextIO],
testing_path: Union[str, TextIO],
validation_path: Union[str, TextIO],
eager: bool = False,
create_inverse_triples: bool = False,
) -> None:
"""Initialize the dataset.
:param training_path: Path to the training triples file or training triples file.
:param testing_path: Path to the testing triples file or testing triples file.
:param validation_path: Path to the validation triples file or validation triples file.
:param eager: Should the data be loaded eagerly? Defaults to false.
:param create_inverse_triples: Should inverse triples be created? Defaults to false.
"""
self.training_path = training_path
self.testing_path = testing_path
self.validation_path = validation_path
self.create_inverse_triples = create_inverse_triples
if eager:
self._load()
self._load_validation()
def _load(self) -> None:
self._training = TriplesFactory.from_path(
path=self.training_path,
create_inverse_triples=self.create_inverse_triples,
)
self._testing = TriplesFactory.from_path(
path=self.testing_path,
entity_to_id=self._training.entity_to_id, # share entity index with training
relation_to_id=self._training.relation_to_id, # share relation index with training
create_inverse_triples=self.create_inverse_triples,
)
def _load_validation(self) -> None:
# don't call this function by itself. assumes called through the `validation`
# property and the _training factory has already been loaded
self._validation = TriplesFactory.from_path(
path=self.validation_path,
entity_to_id=self._training.entity_to_id, # share entity index with training
relation_to_id=self._training.relation_to_id, # share relation index with training
create_inverse_triples=self.create_inverse_triples,
)
def __repr__(self) -> str: # noqa: D105
return (
f'{self.__class__.__name__}(training_path="{self.training_path}", testing_path="{self.testing_path}",'
f' validation_path="{self.validation_path}")'
)
def _name_from_url(url: str) -> str:
"""Get the filename form the end of the URL."""
parse_result = urlparse(url)
path = pathlib.PurePosixPath(parse_result.path)
name = path.name
logger.debug('parsed name from URL: %s', name)
return name
def _urlretrieve(url: str, path: str, clean_on_failure: bool = True, stream: bool = True) -> None:
"""Download a file from a given URL.
:param url: URL to download
:param path: Path to download the file to
:param clean_on_failure: If true, will delete the file on any exception raised during download
"""
if not stream:
logger.info('downloading from %s to %s', url, path)
urlretrieve(url, path) # noqa:S310
else:
# see https://requests.readthedocs.io/en/master/user/quickstart/#raw-response-content
# pattern from https://stackoverflow.com/a/39217788/5775947
try:
with requests.get(url, stream=True) as response, open(path, 'wb') as file:
logger.info('downloading (streaming) from %s to %s', url, path)
shutil.copyfileobj(response.raw, file)
except (Exception, KeyboardInterrupt):
if clean_on_failure:
os.remove(path)
raise
[docs]class UnpackedRemoteDataset(PathDataset):
"""A dataset with all three of train, test, and validation sets as URLs."""
def __init__(
self,
training_url: str,
testing_url: str,
validation_url: str,
cache_root: Optional[str] = None,
eager: bool = False,
create_inverse_triples: bool = False,
stream: bool = True,
force: bool = False,
):
"""Initialize dataset.
:param training_url: The URL of the training file
:param testing_url: The URL of the testing file
:param validation_url: The URL of the validation file
:param cache_root:
An optional directory to store the extracted files. Is none is given, the default PyKEEN directory is used.
This is defined either by the environment variable ``PYKEEN_HOME`` or defaults to ``~/.pykeen``.
:param eager: Should the data be loaded eagerly? Defaults to false.
:param create_inverse_triples: Should inverse triples be created? Defaults to false.
:param stream:
:param force:
"""
self.cache_root = self._help_cache(cache_root)
self.training_url = training_url
self.testing_url = testing_url
self.validation_url = validation_url
training_path = os.path.join(self.cache_root, _name_from_url(self.training_url))
testing_path = os.path.join(self.cache_root, _name_from_url(self.testing_url))
validation_path = os.path.join(self.cache_root, _name_from_url(self.validation_url))
for url, path in [
(self.training_url, training_path),
(self.testing_url, testing_path),
(self.validation_url, validation_path),
]:
if os.path.exists(path) and not force:
continue
_urlretrieve(url, path, stream=stream)
super().__init__(
training_path=training_path,
testing_path=testing_path,
validation_path=validation_path,
eager=eager,
create_inverse_triples=create_inverse_triples,
)
[docs]class RemoteDataset(PathDataset):
"""Contains a lazy reference to a remote dataset that is loaded if needed."""
def __init__(
self,
url: str,
relative_training_path: str,
relative_testing_path: str,
relative_validation_path: str,
cache_root: Optional[str] = None,
eager: bool = False,
create_inverse_triples: bool = False,
):
"""Initialize dataset.
:param url:
The url where to download the dataset from.
:param cache_root:
An optional directory to store the extracted files. Is none is given, the default PyKEEN directory is used.
This is defined either by the environment variable ``PYKEEN_HOME`` or defaults to ``~/.pykeen``.
:param eager: Should the data be loaded eagerly? Defaults to false.
:param create_inverse_triples: Should inverse triples be created? Defaults to false.
"""
self.cache_root = self._help_cache(cache_root)
self.url = url
self._relative_training_path = relative_training_path
self._relative_testing_path = relative_testing_path
self._relative_validation_path = relative_validation_path
training_path, testing_path, validation_path = self._get_paths()
super().__init__(
training_path=training_path,
testing_path=testing_path,
validation_path=validation_path,
eager=eager,
create_inverse_triples=create_inverse_triples,
)
def _get_paths(self) -> Tuple[str, str, str]: # noqa: D401
"""The paths where the extracted files can be found."""
return (
os.path.join(self.cache_root, self._relative_training_path),
os.path.join(self.cache_root, self._relative_testing_path),
os.path.join(self.cache_root, self._relative_validation_path),
)
@abstractmethod
def _extract(self, archive_file: BytesIO) -> None:
"""Extract from the downloaded file."""
raise NotImplementedError
def _get_bytes(self) -> BytesIO:
logger.info(f'Requesting dataset from {self.url}')
res = requests.get(url=self.url)
res.raise_for_status()
return BytesIO(res.content)
def _load(self) -> None: # noqa: D102
all_unpacked = all(
os.path.exists(path) and os.path.isfile(path)
for path in self._get_paths()
)
if not all_unpacked:
archive_file = self._get_bytes()
self._extract(archive_file=archive_file)
logger.info(f'Extracted to {self.cache_root}.')
super()._load()
[docs]class TarFileRemoteDataset(RemoteDataset):
"""A remote dataset stored as a tar file."""
def _extract(self, archive_file: BytesIO) -> None: # noqa: D102
with tarfile.open(fileobj=archive_file) as tf:
tf.extractall(path=self.cache_root)
# TODO replace this with the new zip remote dataset class
[docs]class ZipFileRemoteDataset(RemoteDataset):
"""A remote dataset stored as a zip file."""
def _extract(self, archive_file: BytesIO) -> None: # noqa: D102
with zipfile.ZipFile(file=archive_file) as zf:
zf.extractall(path=self.cache_root)
[docs]class PackedZipRemoteDataset(LazyDataset):
"""Contains a lazy reference to a remote dataset that is loaded if needed."""
head_column: int = 0
relation_column: int = 1
tail_column: int = 2
sep = '\t'
header = None
def __init__(
self,
relative_training_path: str,
relative_testing_path: str,
relative_validation_path: str,
url: Optional[str] = None,
name: Optional[str] = None,
cache_root: Optional[str] = None,
eager: bool = False,
create_inverse_triples: bool = False,
):
"""Initialize dataset.
:param url:
The url where to download the dataset from
:param name:
The name of the file. If not given, tries to get the name from the end of the URL
:param cache_root:
An optional directory to store the extracted files. Is none is given, the default PyKEEN directory is used.
This is defined either by the environment variable ``PYKEEN_HOME`` or defaults to ``~/.pykeen``.
:param eager: Should the data be loaded eagerly? Defaults to false.
:param create_inverse_triples: Should inverse triples be created? Defaults to false.
"""
self.cache_root = self._help_cache(cache_root)
self.name = name or _name_from_url(url)
self.path = os.path.join(self.cache_root, self.name)
logger.debug('file path at %s', self.path)
self.url = url
if not os.path.exists(self.path) and not self.url:
raise ValueError(f'must specify url to download from since path does not exist: {self.path}')
self.relative_training_path = relative_training_path
self.relative_testing_path = relative_testing_path
self.relative_validation_path = relative_validation_path
self.create_inverse_triples = create_inverse_triples
if eager:
self._load()
self._load_validation()
def _load(self) -> None: # noqa: D102
self._training = self._load_helper(self.relative_training_path)
self._testing = self._load_helper(self.relative_testing_path)
def _load_validation(self) -> None:
self._validation = self._load_helper(self.relative_validation_path)
def _load_helper(self, relative_path) -> TriplesFactory:
if not os.path.exists(self.path):
logger.info('downloading data from %s to %s', self.url, self.path)
_urlretrieve(self.url, self.path) # noqa:S310
with zipfile.ZipFile(file=self.path) as zf:
with zf.open(relative_path) as file:
logger.debug('loading %s', relative_path)
df = pd.read_csv(
file,
usecols=[self.head_column, self.relation_column, self.tail_column],
header=self.header,
sep=self.sep,
)
rv = TriplesFactory.from_labeled_triples(
triples=df.values,
create_inverse_triples=self.create_inverse_triples,
)
rv.path = relative_path
return rv
[docs]class TarFileSingleDataset(LazyDataset):
"""Loads a dataset that's a single file inside a tar.gz archive."""
ratios = (0.8, 0.1, 0.1)
_triples_factory: Optional[TriplesFactory]
def __init__(
self,
url: str,
relative_path: str,
name: Optional[str] = None,
cache_root: Optional[str] = None,
eager: bool = False,
create_inverse_triples: bool = False,
delimiter: Optional[str] = None,
random_state: TorchRandomHint = None,
randomize_cleanup: bool = False,
):
"""Initialize dataset.
:param url:
The url where to download the dataset from
:param name:
The name of the file. If not given, tries to get the name from the end of the URL
:param cache_root:
An optional directory to store the extracted files. Is none is given, the default PyKEEN directory is used.
This is defined either by the environment variable ``PYKEEN_HOME`` or defaults to ``~/.pykeen``.
:param relative_path:
The path inside the archive to the contained dataset.
:param random_state:
An optional random state to make the training/testing/validation split reproducible.
:param delimiter:
The delimiter for the contained dataset.
"""
self.cache_root = self._help_cache(cache_root)
self.name = name or _name_from_url(url)
self.random_state = random_state
self.delimiter = delimiter or '\t'
self.randomize_cleanup = randomize_cleanup
self.url = url
self.create_inverse_triples = create_inverse_triples
self._relative_path = relative_path
if eager:
self._load()
def _get_path(self) -> str:
return os.path.join(self.cache_root, self.name)
def _load(self) -> None:
if not os.path.exists(self._get_path()):
_urlretrieve(self.url, self._get_path()) # noqa:S310
_actual_path = os.path.join(self.cache_root, self._relative_path)
if not os.path.exists(_actual_path):
logger.error(
'[%s] untaring from %s (%s) to %s',
self.__class__.__name__,
self._get_path(),
self._relative_path,
_actual_path,
)
with tarfile.open(self._get_path()) as tf:
tf.extract(self._relative_path, self.cache_root)
df = pd.read_csv(_actual_path, sep=self.delimiter)
tf = TriplesFactory.from_labeled_triples(triples=df.values, create_inverse_triples=self.create_inverse_triples)
tf.path = self._get_path()
self._training, self._testing, self._validation = tf.split(
ratios=self.ratios,
random_state=self.random_state,
randomize_cleanup=self.randomize_cleanup,
)
logger.info('[%s] done splitting data from %s', self.__class__.__name__, tf.path)
def _load_validation(self) -> None:
pass # already loaded by _load()
[docs]class TabbedDataset(LazyDataset):
"""This class is for when you've got a single TSV of edges and want them to get auto-split."""
ratios = (0.8, 0.1, 0.1)
_triples_factory: Optional[TriplesFactory]
def __init__(
self,
cache_root: Optional[str] = None,
eager: bool = False,
create_inverse_triples: bool = False,
random_state: TorchRandomHint = None,
):
"""Initialize dataset.
:param cache_root:
An optional directory to store the extracted files. Is none is given, the default PyKEEN directory is used.
This is defined either by the environment variable ``PYKEEN_HOME`` or defaults to ``~/.pykeen``.
:param eager: Should the data be loaded eagerly? Defaults to false.
:param create_inverse_triples: Should inverse triples be created? Defaults to false.
"""
self.cache_root = self._help_cache(cache_root)
self._triples_factory = None
self.random_state = random_state
self.create_inverse_triples = create_inverse_triples
self._training = None
self._testing = None
self._validation = None
if eager:
self._load()
def _get_path(self) -> Optional[str]:
"""Get the path of the data if there's a single file."""
def _get_df(self) -> pd.DataFrame:
raise NotImplementedError
def _load(self) -> None:
df = self._get_df()
path = self._get_path()
tf = TriplesFactory.from_labeled_triples(
triples=df.values,
create_inverse_triples=self.create_inverse_triples,
metadata=dict(path=path) if path else None,
)
self._training, self._testing, self._validation = tf.split(
ratios=self.ratios,
random_state=self.random_state,
)
def _load_validation(self) -> None:
pass # already loaded by _load()
[docs]class SingleTabbedDataset(TabbedDataset):
"""This class is for when you've got a single TSV of edges and want them to get auto-split."""
ratios = (0.8, 0.1, 0.1)
_triples_factory: Optional[TriplesFactory]
def __init__(
self,
url: str,
name: Optional[str] = None,
cache_root: Optional[str] = None,
eager: bool = False,
create_inverse_triples: bool = False,
random_state: TorchRandomHint = None,
read_csv_kwargs: Optional[Mapping[str, Any]] = None,
):
"""Initialize dataset.
:param url:
The url where to download the dataset from
:param name:
The name of the file. If not given, tries to get the name from the end of the URL
:param cache_root:
An optional directory to store the extracted files. Is none is given, the default PyKEEN directory is used.
This is defined either by the environment variable ``PYKEEN_HOME`` or defaults to ``~/.pykeen``.
:param eager: Should the data be loaded eagerly? Defaults to false.
:param create_inverse_triples: Should inverse triples be created? Defaults to false.
"""
super().__init__(
cache_root=cache_root,
create_inverse_triples=create_inverse_triples,
random_state=random_state,
eager=False, # because it gets hooked below
)
self.name = name or _name_from_url(url)
self.read_csv_kwargs = read_csv_kwargs or {}
self.read_csv_kwargs.setdefault('sep', '\t')
self.url = url
if not os.path.exists(self._get_path()) and not self.url:
raise ValueError(f'must specify url to download from since path does not exist: {self._get_path()}')
if eager:
self._load()
def _get_path(self) -> str:
return os.path.join(self.cache_root, self.name)
def _get_df(self) -> pd.DataFrame:
if not os.path.exists(self._get_path()):
logger.info('downloading data from %s to %s', self.url, self._get_path())
_urlretrieve(self.url, self._get_path()) # noqa:S310
df = pd.read_csv(self._get_path(), **self.read_csv_kwargs)
return df