# Source code for pykeen.sampling.basic_negative_sampler

# -*- coding: utf-8 -*-

"""Negative sampling algorithm based on the work of of Bordes *et al.*."""

import math
from typing import Collection, Optional

import torch

from .negative_sampler import NegativeSampler

__all__ = [
"BasicNegativeSampler",
]

LOOKUP = {"h": 0, "r": 1, "t": 2}

[docs]class BasicNegativeSampler(NegativeSampler):
r"""A basic negative sampler.

This negative sampler that corrupts positive triples $(h,r,t) \in \mathcal{K}$ by replacing either $h$, $r$ or $t$
based on the chosen corruption scheme. The corruption scheme can contain $h$, $r$ and $t$ or any subset of these.

Steps:

1. Randomly (uniformly) determine whether $h$, $r$ or $t$ shall be corrupted for a positive triple
$(h,r,t) \in \mathcal{K}$.
2. Randomly (uniformly) sample an entity $e \in \mathcal{E}$ or relation $r' \in \mathcal{R}$ for selection to
corrupt the triple.

- If $h$ was selected before, the corrupted triple is $(e,r,t)$
- If $r$ was selected before, the corrupted triple is $(h,r',t)$
- If $t$ was selected before, the corrupted triple is $(h,r,e)$
3. If filtered is set to True, all proposed corrupted triples that also exist as
actual positive triples $(h,r,t) \in \mathcal{K}$ will be removed.
"""

def __init__(
self,
*,
corruption_scheme: Optional[Collection[str]] = None,
**kwargs,
) -> None:
"""Initialize the basic negative sampler with the given entities.

:param corruption_scheme:
What sides ('h', 'r', 't') should be corrupted. Defaults to head and tail ('h', 't').
:param kwargs:
Additional keyword based arguments passed to :class:pykeen.sampling.NegativeSampler.
"""
super().__init__(**kwargs)
self.corruption_scheme = corruption_scheme or ("h", "t")
# Set the indices
self._corruption_indices = [LOOKUP[side] for side in self.corruption_scheme]

[docs]    def corrupt_batch(self, positive_batch: torch.LongTensor) -> torch.LongTensor:  # noqa: D102
if self.num_negs_per_pos > 1:
positive_batch = positive_batch.repeat_interleave(repeats=self.num_negs_per_pos, dim=0)

# Bind number of negatives to sample
num_negs = positive_batch.shape[0]

# Equally corrupt all sides
split_idx = int(math.ceil(num_negs / len(self._corruption_indices)))

# Copy positive batch for corruption.
# Do not detach, as no gradients should flow into the indices.
negative_batch = positive_batch.clone()

for index, start in zip(self._corruption_indices, range(0, num_negs, split_idx)):
stop = min(start + split_idx, num_negs)

# Relations have a different index maximum than entities
# At least make sure to not replace the triples by the original value
index_max = (self.num_relations if index == 1 else self.num_entities) - 1

negative_batch[start:stop, index] = torch.randint(
high=index_max,
size=(stop - start,),
device=positive_batch.device,
)

# To make sure we don't replace the {head, relation, tail} by the
# original value we shift all values greater or equal than the original value by one up
# for that reason we choose the random value from [0, num_{heads, relations, tails} -1]
negative_batch[start:stop, index] += (
negative_batch[start:stop, index] >= positive_batch[start:stop, index]
).long()

return negative_batch.view(-1, self.num_negs_per_pos, 3)