# -*- coding: utf-8 -*-
"""Implementation of wrapper around sklearn metrics."""
from typing import Mapping, MutableMapping, Optional, Tuple, Type, cast
import numpy as np
import torch
from .evaluator import Evaluator, MetricResults
from ..constants import TARGET_TO_INDEX
from ..metrics.classification import ClassificationMetric, classification_metric_resolver
from ..typing import MappedTriples, Target
__all__ = [
"ClassificationEvaluator",
"ClassificationMetricResults",
]
CLASSIFICATION_METRICS: Mapping[str, Type[ClassificationMetric]] = {
cls().key: cls for cls in classification_metric_resolver
}
[docs]class ClassificationMetricResults(MetricResults):
"""Results from computing metrics."""
metrics = CLASSIFICATION_METRICS
[docs] @classmethod
def from_scores(cls, y_true: np.ndarray, y_score: np.ndarray):
"""Return an instance of these metrics from a given set of true and scores."""
if y_true.size == 0:
raise ValueError(f"Cannot calculate scores from empty array (y_true.shape={y_true.shape}).")
data = dict()
for key, metric in CLASSIFICATION_METRICS.items():
value = metric.score(y_true, y_score)
if isinstance(value, np.number):
# TODO: fix this upstream / make metric.score comply to signature
value = value.item()
data[key] = value
data["num_scores"] = y_score.size
return cls(data=data)
# docstr-coverage: inherited
[docs] def get_metric(self, name: str) -> float: # noqa: D102
if name not in self.data:
raise KeyError(f"Unknown metric: '{name}'. Possible options are: {sorted(self.data.keys())}")
return self.data[name]
[docs]class ClassificationEvaluator(Evaluator):
"""An evaluator that uses a classification metrics."""
all_scores: MutableMapping[Tuple[Target, int, int], np.ndarray]
all_positives: MutableMapping[Tuple[Target, int, int], np.ndarray]
def __init__(self, **kwargs):
"""
Initialize the evaluator.
:param kwargs:
keyword-based parameters passed to :meth:`Evaluator.__init__`.
"""
super().__init__(
filtered=False,
requires_positive_mask=True,
**kwargs,
)
self.all_scores = {}
self.all_positives = {}
# docstr-coverage: inherited
[docs] def process_scores_(
self,
hrt_batch: MappedTriples,
target: Target,
scores: torch.FloatTensor,
true_scores: Optional[torch.FloatTensor] = None,
dense_positive_mask: Optional[torch.FloatTensor] = None,
) -> None: # noqa: D102
if dense_positive_mask is None:
raise KeyError("Sklearn evaluators need the positive mask!")
# Transfer to cpu and convert to numpy
scores = scores.detach().cpu().numpy()
dense_positive_mask = dense_positive_mask.detach().cpu().numpy()
remaining = [i for i in range(hrt_batch.shape[1]) if i != TARGET_TO_INDEX[target]]
keys = hrt_batch[:, remaining].detach().cpu().numpy()
# Ensure that each key gets counted only once
for i in range(keys.shape[0]):
# include head_side flag into key to differentiate between (h, r) and (r, t)
key_suffix = tuple(map(int, keys[i]))
assert len(key_suffix) == 2
key_suffix = cast(Tuple[int, int], key_suffix)
key = (target,) + key_suffix
self.all_scores[key] = scores[i]
self.all_positives[key] = dense_positive_mask[i]
# docstr-coverage: inherited
[docs] def clear(self) -> None: # noqa: D102
self.all_positives.clear()
self.all_scores.clear()
# docstr-coverage: inherited
[docs] def finalize(self) -> ClassificationMetricResults: # noqa: D102
# Because the order of the values of an dictionary is not guaranteed,
# we need to retrieve scores and masks using the exact same key order.
all_keys = list(self.all_scores.keys())
y_score = np.concatenate([self.all_scores[k] for k in all_keys], axis=0).flatten()
y_true = np.concatenate([self.all_positives[k] for k in all_keys], axis=0).flatten()
# Clear buffers
self.clear()
return ClassificationMetricResults.from_scores(y_true, y_score)