# -*- coding: utf-8 -*-
"""Utility classes for constructing datasets."""
import logging
import os
import shutil
import tarfile
import zipfile
from abc import abstractmethod
from io import BytesIO
from typing import List, Optional, TextIO, Tuple, Union
from urllib.parse import urlparse
import numpy as np
import pandas as pd
import requests
from tabulate import tabulate
from ..constants import PYKEEN_HOME
from ..triples import TriplesFactory
from ..utils import normalize_string
__all__ = [
'DataSet',
'EagerDataset',
'LazyDataSet',
'PathDataSet',
'RemoteDataSet',
'TarFileRemoteDataSet',
'ZipFileRemoteDataSet',
'PackedZipRemoteDataSet',
'SingleTabbedDataset',
]
logger = logging.getLogger(__name__)
[docs]class DataSet:
"""Contains a lazy reference to a training, testing, and validation data set."""
#: A factory wrapping the training triples
training: TriplesFactory
#: A factory wrapping the testing triples, that share indexes with the training triples
testing: TriplesFactory
#: A factory wrapping the validation triples, that share indexes with the training triples
validation: TriplesFactory
#: All data sets should take care of inverse triple creation
create_inverse_triples: bool
@property
def entity_to_id(self): # noqa: D401
"""The mapping of entity labels to IDs."""
return self.training.entity_to_id
@property
def relation_to_id(self): # noqa: D401
"""The mapping of relation labels to IDs."""
return self.training.relation_to_id
@property
def num_entities(self): # noqa: D401
"""The number of entities."""
return self.training.num_entities
@property
def num_relations(self): # noqa: D401
"""The number of relations."""
return self.training.num_relations
[docs] def summary_str(self) -> str:
"""Make a summary string of all of the factories."""
t = tabulate(
[
(label, triples_factory.num_entities, triples_factory.num_relations, triples_factory.num_triples)
for label, triples_factory in
zip(('Training', 'Testing', 'Validation'), (self.training, self.testing, self.validation))
],
headers=['Name', 'Entities', 'Relations', 'Triples'],
)
return f'{self.__class__.__name__} (create_inverse_triples={self.create_inverse_triples})\n{t}'
[docs] def summarize(self) -> None:
"""Print a summary of the dataset."""
print(self.summary_str())
def __str__(self) -> str: # noqa: D105
return f'{self.__class__.__name__}(num_entities={self.num_entities}, num_relations={self.num_relations})'
[docs] @classmethod
def from_path(cls, path: str, ratios: Optional[List[float]] = None) -> 'DataSet':
"""Create a dataset from a single triples factory by splitting it in 3."""
tf = TriplesFactory(path=path)
return cls.from_tf(tf=tf, ratios=ratios)
[docs] @staticmethod
def from_tf(tf: TriplesFactory, ratios: Optional[List[float]] = None) -> 'DataSet':
"""Create a dataset from a single triples factory by splitting it in 3."""
training, testing, validation = tf.split(ratios or [0.8, 0.1, 0.1])
return EagerDataset(training=training, testing=testing, validation=validation)
[docs] @classmethod
def get_normalized_name(cls) -> str:
"""Get the normalized name of the dataset."""
return normalize_string(cls.__name__)
[docs]class EagerDataset(DataSet):
"""A dataset that has already been loaded."""
def __init__(self, training: TriplesFactory, testing: TriplesFactory, validation: TriplesFactory) -> None:
self.training = training
self.testing = testing
self.validation = validation
self.create_inverse_triples = (
training.create_inverse_triples
and testing.create_inverse_triples
and self.validation.create_inverse_triples
)
[docs]class LazyDataSet(DataSet):
"""A data set that has lazy loading."""
#: The actual instance of the training factory, which is exposed to the user through `training`
_training: Optional[TriplesFactory] = None
#: The actual instance of the testing factory, which is exposed to the user through `testing`
_testing: Optional[TriplesFactory] = None
#: The actual instance of the validation factory, which is exposed to the user through `validation`
_validation: Optional[TriplesFactory] = None
@property
def training(self) -> TriplesFactory: # noqa: D401
"""The training triples factory."""
if not self._loaded:
self._load()
return self._training
@property
def testing(self) -> TriplesFactory: # noqa: D401
"""The testing triples factory that shares indexes with the training triples factory."""
if not self._loaded:
self._load()
return self._testing
@property
def validation(self) -> TriplesFactory: # noqa: D401
"""The validation triples factory that shares indexes with the training triples factory."""
if not self._loaded:
self._load()
if not self._loaded_validation:
self._load_validation()
return self._validation
@property
def _loaded(self) -> bool:
return self._training is not None and self._testing is not None
@property
def _loaded_validation(self):
return self._validation is not None
def _load(self) -> None:
raise NotImplementedError
def _load_validation(self) -> None:
raise NotImplementedError
[docs]class PathDataSet(LazyDataSet):
"""Contains a lazy reference to a training, testing, and validation data set."""
def __init__(
self,
training_path: Union[str, TextIO],
testing_path: Union[str, TextIO],
validation_path: Union[str, TextIO],
eager: bool = False,
create_inverse_triples: bool = False,
) -> None:
"""Initialize the data set.
:param training_path: Path to the training triples file or training triples file.
:param testing_path: Path to the testing triples file or testing triples file.
:param validation_path: Path to the validation triples file or validation triples file.
:param eager: Should the data be loaded eagerly? Defaults to false.
:param create_inverse_triples: Should inverse triples be created? Defaults to false.
"""
self.training_path = training_path
self.testing_path = testing_path
self.validation_path = validation_path
self.create_inverse_triples = create_inverse_triples
if eager:
self._load()
self._load_validation()
def _load(self) -> None:
self._training = TriplesFactory(
path=self.training_path,
create_inverse_triples=self.create_inverse_triples,
)
self._testing = TriplesFactory(
path=self.testing_path,
entity_to_id=self._training.entity_to_id, # share entity index with training
relation_to_id=self._training.relation_to_id, # share relation index with training
)
def _load_validation(self) -> None:
# don't call this function by itself. assumes called through the `validation`
# property and the _training factory has already been loaded
self._validation = TriplesFactory(
path=self.validation_path,
entity_to_id=self._training.entity_to_id, # share entity index with training
relation_to_id=self._training.relation_to_id, # share relation index with training
)
def __repr__(self) -> str: # noqa: D105
return (
f'{self.__class__.__name__}(training_path="{self.training_path}", testing_path="{self.testing_path}",'
f' validation_path="{self.validation_path}")'
)
def _urlretrieve(url, path, clean_on_failure: bool = True) -> None:
"""Download a file from a given URL.
:param url: URL to download
:param path: Path to download the file to
:param clean_on_failure: If true, will delete the file on any exception raised during download
"""
# see https://requests.readthedocs.io/en/master/user/quickstart/#raw-response-content
# pattern from https://stackoverflow.com/a/39217788/5775947
try:
with requests.get(url, stream=True) as response, open(path, 'wb') as file:
shutil.copyfileobj(response.raw, file)
except (Exception, KeyboardInterrupt):
if clean_on_failure:
os.remove(path)
raise
[docs]class RemoteDataSet(PathDataSet):
"""Contains a lazy reference to a remote dataset that is loaded if needed."""
def __init__(
self,
url: str,
relative_training_path: str,
relative_testing_path: str,
relative_validation_path: str,
cache_root: Optional[str] = None,
eager: bool = False,
create_inverse_triples: bool = False,
):
"""Initialize dataset.
:param url:
The url where to download the dataset from.
:param cache_root:
An optional directory to store the extracted files. Is none is given, the default PyKEEN directory is used.
This is defined either by the environment variable ``PYKEEN_HOME`` or defaults to ``~/.pykeen``.
"""
if cache_root is None:
cache_root = PYKEEN_HOME
self.cache_root = os.path.join(cache_root, self.__class__.__name__.lower())
os.makedirs(cache_root, exist_ok=True)
self.url = url
self._relative_training_path = relative_training_path
self._relative_testing_path = relative_testing_path
self._relative_validation_path = relative_validation_path
training_path, testing_path, validation_path = self._get_paths()
super().__init__(
training_path=training_path,
testing_path=testing_path,
validation_path=validation_path,
eager=eager,
create_inverse_triples=create_inverse_triples,
)
def _get_paths(self) -> Tuple[str, str, str]: # noqa: D401
"""The paths where the extracted files can be found."""
return (
os.path.join(self.cache_root, self._relative_training_path),
os.path.join(self.cache_root, self._relative_testing_path),
os.path.join(self.cache_root, self._relative_validation_path),
)
@abstractmethod
def _extract(self, archive_file: BytesIO) -> None:
"""Extract from the downloaded file."""
raise NotImplementedError
def _load(self) -> None: # noqa: D102
all_unpacked = all(
os.path.exists(path) and os.path.isfile(path)
for path in self._get_paths()
)
if not all_unpacked:
logger.info(f'Requesting dataset from {self.url}')
r = requests.get(url=self.url)
assert r.status_code == requests.codes.ok
archive_file = BytesIO(r.content)
self._extract(archive_file=archive_file)
logger.info(f'Extracted to {self.cache_root}.')
super()._load()
[docs]class TarFileRemoteDataSet(RemoteDataSet):
"""A remote dataset stored as a tar file."""
def _extract(self, archive_file: BytesIO) -> None: # noqa: D102
with tarfile.open(fileobj=archive_file) as tf:
tf.extractall(path=self.cache_root)
# TODO replace this with the new zip remote dataset class
[docs]class ZipFileRemoteDataSet(RemoteDataSet):
"""A remote dataset stored as a zip file."""
def _extract(self, archive_file: BytesIO) -> None: # noqa: D102
with zipfile.ZipFile(file=archive_file) as zf:
zf.extractall(path=self.cache_root)
[docs]class PackedZipRemoteDataSet(LazyDataSet):
"""Contains a lazy reference to a remote dataset that is loaded if needed."""
head_column: int = 0
relation_column: int = 1
tail_column: int = 2
sep = '\t'
header = None
def __init__(
self,
relative_training_path: str,
relative_testing_path: str,
relative_validation_path: str,
url: Optional[str] = None,
name: Optional[str] = None,
cache_root: Optional[str] = None,
eager: bool = False,
create_inverse_triples: bool = False,
):
"""Initialize dataset.
:param url:
The url where to download the dataset from
:param name:
The name of the file. If not given, tries to get the name from the end of the URL
:param cache_root:
An optional directory to store the extracted files. Is none is given, the default PyKEEN directory is used.
This is defined either by the environment variable ``PYKEEN_HOME`` or defaults to ``~/.pykeen``.
"""
if cache_root is None:
cache_root = os.path.join(PYKEEN_HOME, self.__class__.__name__.lower())
self.cache_root = cache_root
os.makedirs(cache_root, exist_ok=True)
logger.debug('using cache root at %s', cache_root)
if name is None:
parse_result = urlparse(url)
name = os.path.basename(parse_result.path)
logger.info('parsed name from URL: %s', name)
self.name = name
self.path = os.path.join(self.cache_root, self.name)
logger.debug('file path at %s', self.path)
self.url = url
if not os.path.exists(self.path) and not self.url:
raise ValueError(f'must specify url to download from since path does not exist: {self.path}')
self.relative_training_path = relative_training_path
self.relative_testing_path = relative_testing_path
self.relative_validation_path = relative_validation_path
self.create_inverse_triples = create_inverse_triples
if eager:
self._load()
self._load_validation()
def _load(self) -> None: # noqa: D102
self._training = self._load_helper(self.relative_training_path)
self._testing = self._load_helper(self.relative_testing_path)
def _load_validation(self) -> None:
self._validation = self._load_helper(self.relative_validation_path)
def _load_helper(self, relative_path) -> TriplesFactory:
if not os.path.exists(self.path):
logger.info('downloading data from %s to %s', self.url, self.path)
_urlretrieve(self.url, self.path) # noqa:S310
with zipfile.ZipFile(file=self.path) as zf:
with zf.open(relative_path) as file:
logger.debug('loading %s', relative_path)
df = pd.read_csv(
file,
usecols=[self.head_column, self.relation_column, self.tail_column],
header=self.header,
sep=self.sep,
)
rv = TriplesFactory(
triples=df.values,
create_inverse_triples=self.create_inverse_triples,
)
rv.path = relative_path
return rv
[docs]class SingleTabbedDataset(LazyDataSet):
"""This class is for when you've got a single TSV of edges and want them to get auto-split."""
ratios = (0.8, 0.1, 0.1)
_triples_factory: Optional[TriplesFactory]
def __init__(
self,
url: Optional[str] = None,
name: Optional[str] = None,
cache_root: Optional[str] = None,
eager: bool = False,
create_inverse_triples: bool = False,
random_state: Union[None, int, np.random.RandomState] = None,
):
"""Initialize dataset.
:param url:
The url where to download the dataset from
:param name:
The name of the file. If not given, tries to get the name from the end of the URL
:param cache_root:
An optional directory to store the extracted files. Is none is given, the default PyKEEN directory is used.
This is defined either by the environment variable ``PYKEEN_HOME`` or defaults to ``~/.pykeen``.
"""
if cache_root is None:
cache_root = os.path.join(PYKEEN_HOME, self.__class__.__name__.lower())
self.cache_root = cache_root
os.makedirs(cache_root, exist_ok=True)
logger.debug('using cache root at %s', cache_root)
if name is None:
parse_result = urlparse(url)
name = os.path.basename(parse_result.path)
logger.info('parsed name from URL: %s', name)
self.name = name
self.path = os.path.join(self.cache_root, self.name)
logger.debug('file path at %s', self.path)
self._triples_factory = None
self.random_state = random_state
self.url = url
if not os.path.exists(self.path) and not self.url:
raise ValueError(f'must specify url to download from since path does not exist: {self.path}')
self.create_inverse_triples = create_inverse_triples
self._training = None
self._testing = None
self._validation = None
if eager:
self._load()
def _load(self) -> None:
if not os.path.exists(self.path):
logger.info('downloading data from %s to %s', self.url, self.path)
_urlretrieve(self.url, self.path) # noqa:S310
df = pd.read_csv(self.path, sep='\t')
tf = TriplesFactory(triples=df.values, create_inverse_triples=self.create_inverse_triples)
tf.path = self.path
self._training, self._testing, self._validation = tf.split(
ratios=self.ratios,
random_state=self.random_state,
)
def _load_validation(self) -> None:
pass # already loaded by _load()