Source code for pykeen.nn.emb

# -*- coding: utf-8 -*-

"""Embedding modules."""

from __future__ import annotations

import functools
import itertools
import logging
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, TypeVar, Union, cast

import numpy as np
import torch
import torch.nn
from torch import nn
from torch.nn import functional

from .compositions import CompositionModule, composition_resolver
from .init import (
    init_phases, normal_norm_, uniform_norm_, xavier_normal_, xavier_normal_norm_, xavier_uniform_,
    xavier_uniform_norm_,
)
from .message_passing import Decomposition, decomposition_resolver
from .weighting import EdgeWeighting, SymmetricEdgeWeighting, edge_weight_resolver
from ..regularizers import Regularizer, regularizer_resolver
from ..triples import CoreTriplesFactory
from ..typing import Constrainer, Hint, HintType, Initializer, Normalizer
from ..utils import Bias, activation_resolver, clamp_norm, complex_normalize, convert_to_canonical_shape

__all__ = [
    'RepresentationModule',
    'Embedding',
    'LiteralRepresentation',
    'EmbeddingSpecification',
    'constrainers',
    'initializers',
    'normalizers',
]

logger = logging.getLogger(__name__)


[docs]class RepresentationModule(nn.Module, ABC): """ A base class for obtaining representations for entities/relations. A representation module maps integer IDs to representations, which are tensors of floats. `max_id` defines the upper bound of indices we are allowed to request (exclusively). For simple embeddings this is equivalent to num_embeddings, but more a more appropriate word for general non-embedding representations, where the representations could come from somewhere else, e.g. a GNN encoder. `shape` describes the shape of a single representation. In case of a vector embedding, this is just a single dimension. For others, e.g. :class:`pykeen.models.RESCAL`, we have 2-d representations, and in general it can be any fixed shape. We can look at all representations as a tensor of shape `(max_id, *shape)`, and this is exactly the result of passing `indices=None` to the forward method. We can also pass multi-dimensional `indices` to the forward method, in which case the indices' shape becomes the prefix of the result shape: `(*indices.shape, *self.shape)`. """ #: the maximum ID (exclusively) max_id: int #: the shape of an individual representation shape: Tuple[int, ...] def __init__( self, max_id: int, shape: Sequence[int], ): """Initialize the representation module. :param max_id: The maximum ID (exclusively). Valid Ids reach from 0, ..., max_id-1 :param shape: The shape of an individual representation. """ super().__init__() self.max_id = max_id self.shape = tuple(shape)
[docs] @abstractmethod def forward( self, indices: Optional[torch.LongTensor] = None, ) -> torch.FloatTensor: """Get representations for indices. :param indices: shape: s The indices, or None. If None, this is interpreted as ``torch.arange(self.max_id)`` (although implemented more efficiently). :return: shape: (``*s``, ``*self.shape``) The representations. """
[docs] def reset_parameters(self) -> None: """Reset the module's parameters."""
[docs] def post_parameter_update(self): """Apply constraints which should not be included in gradients."""
[docs] def get_in_canonical_shape( self, indices: Optional[torch.LongTensor] = None, ) -> torch.FloatTensor: """Get representations in canonical shape. :param indices: None, shape: (b,) or (b, n) The indices. If None, return all representations. :return: shape: (b?, n?, d) If indices is None, b=1, n=max_id. If indices is 1-dimensional, b=indices.shape[0] and n=1. If indices is 2-dimensional, b, n = indices.shape """ x = self(indices=indices) if indices is None: x = x.unsqueeze(dim=0) elif indices.ndimension() > 2: raise ValueError( f"Undefined canonical shape for more than 2-dimensional index tensors: {indices.shape}", ) elif indices.ndimension() == 1: x = x.unsqueeze(dim=1) return x
[docs] def get_in_more_canonical_shape( self, dim: Union[int, str], indices: Optional[torch.LongTensor] = None, ) -> torch.FloatTensor: """Get representations in canonical shape. The canonical shape is given as (batch_size, d_1, d_2, d_3, ``*``) fulfilling the following properties: Let i = dim. If indices is None, the return shape is (1, d_1, d_2, d_3) with d_i = num_representations, d_i = 1 else. If indices is not None, then batch_size = indices.shape[0], and d_i = 1 if indices.ndimension() = 1 else d_i = indices.shape[1] The canonical shape is given by (batch_size, 1, ``*``) if indices is not None, where batch_size=len(indices), or (1, num, ``*``) if indices is None with num equal to the total number of embeddings. Examples: >>> emb = EmbeddingSpecification(shape=(20,)).make(num_embeddings=10) >>> # Get head representations for given batch indices >>> emb.get_in_more_canonical_shape(dim="h", indices=torch.arange(5)).shape (5, 1, 1, 1, 20) >>> # Get head representations for given 2D batch indices, as e.g. used by fast slcwa scoring >>> emb.get_in_more_canonical_shape(dim="h", indices=torch.arange(6).view(2, 3)).shape (2, 3, 1, 1, 20) >>> # Get head representations for 1:n scoring >>> emb.get_in_more_canonical_shape(dim="h", indices=None).shape (1, 10, 1, 1, 20) :param dim: The dimension along which to expand for ``indices=None``, or ``indices.ndimension() == 2``. :param indices: The indices. Either None, in which care all embeddings are returned, or a 1 or 2 dimensional index tensor. :return: shape: (batch_size, d1, d2, d3, ``*self.shape``) """ r_shape: Tuple[int, ...] if indices is None: x = self(indices=indices) r_shape = (1, self.max_id) else: flat_indices = indices.view(-1) x = self(indices=flat_indices) if indices.ndimension() > 1: x = x.view(*indices.shape, -1) r_shape = tuple(indices.shape) if len(r_shape) < 2: r_shape = r_shape + (1,) return convert_to_canonical_shape(x=x, dim=dim, num=r_shape[1], batch_size=r_shape[0], suffix_shape=self.shape)
@property def embedding_dim(self) -> int: """Return the "embedding dimension". Kept for backward compatibility.""" # TODO: Remove this property and update code to use shape instead warnings.warn("The embedding_dim property is deprecated. Use .shape instead.", DeprecationWarning) return int(np.prod(self.shape))
[docs]class Embedding(RepresentationModule): """Trainable embeddings. This class provides the same interface as :class:`torch.nn.Embedding` and can be used throughout PyKEEN as a more fully featured drop-in replacement. It extends it by adding additional options for normalizing, constraining, or applying dropout. When a *normalizer* is selected, it is applied in every forward pass. It can be used, e.g., to ensure that the embedding vectors are of unit length. A *constrainer* can be used similarly, but it is applied after each parameter update (using the post_parameter_update hook), i.e., outside of the automatic gradient computation. The optional dropout can also be used as a regularization technique. Moreover, it enables to obtain uncertainty estimates via techniques such as `Monte-Carlo dropout <https://arxiv.org/abs/1506.02142>`_. The following simple example shows how to obtain different scores for a single triple from an (untrained) model. These scores can be considered as samples from a distribution over the scores. >>> from pykeen.datasets import Nations >>> dataset = Nations() >>> from pykeen.nn.emb import EmbeddingSpecification >>> spec = EmbeddingSpecification(embedding_dim=3, dropout=0.1) >>> from pykeen.models import ERModel >>> model = ERModel( ... triples_factory=dataset.training, ... interaction='distmult', ... entity_representations=spec, ... relation_representations=spec, ... ) >>> import torch >>> batch = torch.as_tensor(data=[[0, 1, 0]]).repeat(10, 1) >>> scores = model.score_hrt(batch) """ normalizer: Optional[Normalizer] constrainer: Optional[Constrainer] regularizer: Optional[Regularizer] dropout: Optional[nn.Dropout] def __init__( self, num_embeddings: int, embedding_dim: Optional[int] = None, shape: Union[None, int, Sequence[int]] = None, initializer: Hint[Initializer] = None, initializer_kwargs: Optional[Mapping[str, Any]] = None, normalizer: Hint[Normalizer] = None, normalizer_kwargs: Optional[Mapping[str, Any]] = None, constrainer: Hint[Constrainer] = None, constrainer_kwargs: Optional[Mapping[str, Any]] = None, regularizer: Hint[Regularizer] = None, regularizer_kwargs: Optional[Mapping[str, Any]] = None, trainable: bool = True, dtype: Optional[torch.dtype] = None, dropout: Optional[float] = None, ): """Instantiate an embedding with extended functionality. :param num_embeddings: >0 The number of embeddings. :param embedding_dim: >0 The embedding dimensionality. :param initializer: An optional initializer, which takes an uninitialized (num_embeddings, embedding_dim) tensor as input, and returns an initialized tensor of same shape and dtype (which may be the same, i.e. the initialization may be in-place). Can be passed as a function, or as string corresponding to a key in :data:`pykeen.nn.emb.initializers` such as: - ``"xavier_uniform"`` - ``"xavier_uniform_norm"`` - ``"xavier_normal"`` - ``"xavier_normal_norm"`` - ``"normal"`` - ``"normal_norm"`` - ``"uniform"`` - ``"uniform_norm"`` - ``"init_phases"`` :param initializer_kwargs: Additional keyword arguments passed to the initializer :param normalizer: A normalization function, which is applied in every forward pass. :param normalizer_kwargs: Additional keyword arguments passed to the normalizer :param constrainer: A function which is applied to the weights after each parameter update, without tracking gradients. It may be used to enforce model constraints outside of gradient-based training. The function does not need to be in-place, but the weight tensor is modified in-place. Can be passed as a function, or as a string corresponding to a key in :data:`pykeen.nn.emb.constrainers` such as: - ``'normalize'`` - ``'complex_normalize'`` - ``'clamp'`` - ``'clamp_norm'`` :param constrainer_kwargs: Additional keyword arguments passed to the constrainer :param regularizer: A regularizer, which is applied to the selected embeddings in forward pass :param regularizer_kwargs: Additional keyword arguments passed to the regularizer :param dropout: A dropout value for the embeddings. """ # normalize embedding_dim vs. shape _embedding_dim, shape = process_shape(embedding_dim, shape) if dtype is None: dtype = torch.get_default_dtype() # work-around until full complex support # TODO: verify that this is our understanding of complex! if dtype.is_complex: shape = tuple(shape[:-1]) + (2 * shape[-1],) _embedding_dim = _embedding_dim * 2 super().__init__( max_id=num_embeddings, shape=shape, ) self.initializer = cast(Initializer, _handle( initializer, initializers, initializer_kwargs, default=nn.init.normal_, label='initializer', )) self.normalizer = _handle(normalizer, normalizers, normalizer_kwargs, label='normalizer') self.constrainer = _handle(constrainer, constrainers, constrainer_kwargs, label='constrainer') if regularizer is not None: regularizer = regularizer_resolver.make(regularizer, pos_kwargs=regularizer_kwargs) self.regularizer = regularizer self._embeddings = torch.nn.Embedding( num_embeddings=num_embeddings, embedding_dim=_embedding_dim, ) self._embeddings.requires_grad_(trainable) self.dropout = None if dropout is None else nn.Dropout(dropout)
[docs] @classmethod def init_with_device( cls, num_embeddings: int, embedding_dim: int, device: torch.device, initializer: Optional[Initializer] = None, initializer_kwargs: Optional[Mapping[str, Any]] = None, normalizer: Optional[Normalizer] = None, normalizer_kwargs: Optional[Mapping[str, Any]] = None, constrainer: Optional[Constrainer] = None, constrainer_kwargs: Optional[Mapping[str, Any]] = None, ) -> 'Embedding': # noqa:E501 """Create an embedding object on the given device by wrapping :func:`__init__`. This method is a hotfix for not being able to pass a device during initialization of :class:`torch.nn.Embedding`. Instead the weight is always initialized on CPU and has to be moved to GPU afterwards. .. seealso:: https://developer.nvidia.com/gpugems/gpugems3/part-vi-gpu-computing/chapter-37-efficient-random-number-generation-and-application :return: The embedding. """ return cls( num_embeddings=num_embeddings, embedding_dim=embedding_dim, initializer=initializer, initializer_kwargs=initializer_kwargs, normalizer=normalizer, normalizer_kwargs=normalizer_kwargs, constrainer=constrainer, constrainer_kwargs=constrainer_kwargs, ).to(device=device)
@property def num_embeddings(self) -> int: # noqa: D401 """The total number of representations (i.e. the maximum ID).""" # wrapper around max_id, for backward compatibility return self.max_id @property def embedding_dim(self) -> int: # noqa: D401 """The representation dimension.""" return self._embeddings.embedding_dim
[docs] def reset_parameters(self) -> None: # noqa: D102 # initialize weights in-place self._embeddings.weight.data = self.initializer( self._embeddings.weight.data.view(self.num_embeddings, *self.shape), ).view(self.num_embeddings, self.embedding_dim)
[docs] def post_parameter_update(self): # noqa: D102 # apply constraints in-place if self.constrainer is not None: self._embeddings.weight.data = self.constrainer(self._embeddings.weight.data)
[docs] def forward( self, indices: Optional[torch.LongTensor] = None, ) -> torch.FloatTensor: # noqa: D102 if indices is None: prefix_shape = (self.max_id,) x = self._embeddings.weight else: prefix_shape = indices.shape x = self._embeddings(indices) x = x.view(*prefix_shape, *self.shape) # verify that contiguity is preserved assert x.is_contiguous() # TODO: move normalizer / regularizer to base class? if self.normalizer is not None: x = self.normalizer(x) if self.regularizer is not None: self.regularizer.update(x) if self.dropout is not None: x = self.dropout(x) return x
[docs]class LiteralRepresentation(Embedding): """Literal representations.""" def __init__( self, numeric_literals: torch.FloatTensor, ): self._numeric_literals = numeric_literals num_embeddings, embedding_dim = numeric_literals.shape super().__init__( num_embeddings=num_embeddings, embedding_dim=embedding_dim, initializer=self._initialize_literals, ) # freeze self._embeddings.requires_grad_(False) # use this instead of a lambda to make sure that it can be pickled def _initialize_literals(self, _) -> torch.FloatTensor: return self._numeric_literals
[docs]@dataclass class EmbeddingSpecification: """An embedding specification.""" embedding_dim: Optional[int] = None shape: Union[None, int, Sequence[int]] = None initializer: Hint[Initializer] = None initializer_kwargs: Optional[Mapping[str, Any]] = None normalizer: Hint[Normalizer] = None normalizer_kwargs: Optional[Mapping[str, Any]] = None constrainer: Hint[Constrainer] = None constrainer_kwargs: Optional[Mapping[str, Any]] = None regularizer: Hint[Regularizer] = None regularizer_kwargs: Optional[Mapping[str, Any]] = None dtype: Optional[torch.dtype] = None dropout: Optional[float] = None
[docs] def make(self, *, num_embeddings: int, device: Optional[torch.device] = None) -> Embedding: """Create an embedding with this specification.""" rv = Embedding( num_embeddings=num_embeddings, embedding_dim=self.embedding_dim, shape=self.shape, initializer=self.initializer, initializer_kwargs=self.initializer_kwargs, normalizer=self.normalizer, normalizer_kwargs=self.normalizer_kwargs, constrainer=self.constrainer, constrainer_kwargs=self.constrainer_kwargs, regularizer=self.regularizer, regularizer_kwargs=self.regularizer_kwargs, dtype=self.dtype, dropout=self.dropout, ) if device is not None: rv = rv.to(device) return rv
def process_shape( dim: Optional[int], shape: Union[None, int, Sequence[int]], ) -> Tuple[int, Sequence[int]]: """Make a shape pack.""" if shape is None and dim is None: raise ValueError('Missing both, shape and embedding_dim') elif shape is not None and dim is not None: raise ValueError('Provided both, shape and embedding_dim') elif shape is None and dim is not None: shape = (dim,) elif isinstance(shape, int) and dim is None: dim = shape shape = (shape,) elif isinstance(shape, Sequence) and dim is None: shape = tuple(shape) dim = int(np.prod(shape)) else: raise TypeError(f'Invalid type for shape: ({type(shape)}) {shape}') return dim, shape initializers = { 'xavier_uniform': xavier_uniform_, 'xavier_uniform_norm': xavier_uniform_norm_, 'xavier_normal': xavier_normal_, 'xavier_normal_norm': xavier_normal_norm_, 'normal': torch.nn.init.normal_, 'normal_norm': normal_norm_, 'uniform': torch.nn.init.uniform_, 'uniform_norm': uniform_norm_, 'phases': init_phases, 'init_phases': init_phases, } constrainers = { 'normalize': functional.normalize, 'complex_normalize': complex_normalize, 'clamp': torch.clamp, 'clamp_norm': clamp_norm, } # TODO add normalization functions normalizers: Mapping[str, Normalizer] = {} X = TypeVar('X', bound=Callable) def _handle( value: Hint[X], lookup: Mapping[str, X], kwargs, default: Optional[X] = None, label: Optional[str] = None, ) -> Optional[X]: if value is None: return default elif isinstance(value, str): try: value = lookup[value] except KeyError: raise KeyError(f'{value} is an invalid {label}. Try one of: {sorted(lookup)}') if kwargs: rv = functools.partial(value, **kwargs) # type: ignore return cast(X, rv) return value class RGCNRepresentations(RepresentationModule): """Entity representations enriched by R-GCN.""" def __init__( self, triples_factory: CoreTriplesFactory, embedding_specification: EmbeddingSpecification, num_layers: int = 2, use_bias: bool = True, use_batch_norm: bool = False, activation: Hint[nn.Module] = None, activation_kwargs: Optional[Mapping[str, Any]] = None, edge_dropout: float = 0.4, self_loop_dropout: float = 0.2, edge_weighting: Hint[EdgeWeighting] = None, decomposition: Hint[Decomposition] = None, decomposition_kwargs: Optional[Mapping[str, Any]] = None, ): base_embeddings = embedding_specification.make(num_embeddings=triples_factory.num_entities) super().__init__(max_id=triples_factory.num_entities, shape=base_embeddings.shape) self.entity_embeddings = base_embeddings # Resolve edge weighting self.edge_weighting = edge_weight_resolver.make(query=edge_weighting) # dropout self.edge_dropout = edge_dropout self.self_loop_dropout = self_loop_dropout or edge_dropout # batch norm and bias use_batch_norm = use_batch_norm if use_batch_norm: if use_bias: logger.warning("Disabling bias because batch normalization is used.") use_bias = False # Save graph using buffers, such that the tensors are moved together with the model h, r, t = triples_factory.mapped_triples.t() self.register_buffer("sources", h) self.register_buffer("targets", t) self.register_buffer("edge_types", r) layers = [] for _ in range(num_layers): layers.append( decomposition_resolver.make( query=decomposition, pos_kwargs=decomposition_kwargs, input_dim=base_embeddings.embedding_dim, num_relations=triples_factory.num_relations, ), ) if use_bias: layers.append(Bias(dim=base_embeddings.embedding_dim)) if use_batch_norm: layers.append(nn.BatchNorm1d(num_features=base_embeddings.embedding_dim)) layers.append(activation_resolver.make(query=activation, pos_kwargs=activation_kwargs)) self.layers = nn.ModuleList(layers) # buffering of enriched representations self.enriched_embeddings = None def post_parameter_update(self) -> None: # noqa: D102 super().post_parameter_update() # invalidate enriched embeddings self.enriched_embeddings = None def reset_parameters(self): # noqa: D102 self.entity_embeddings.reset_parameters() for m in self.layers: if hasattr(m, "reset_parameters"): m.reset_parameters() elif any(p.requires_grad for p in m.parameters()): logger.warning("Layers %s has parameters, but no reset_parameters.", m) def _real_forward(self) -> torch.FloatTensor: if self.enriched_embeddings is not None: return self.enriched_embeddings # Bind fields # shape: (num_entities, embedding_dim) x = self.entity_embeddings(indices=None) sources = self.sources targets = self.targets edge_types = self.edge_types # Edge dropout: drop the same edges on all layers (only in training mode) if self.training and self.edge_dropout is not None: # Get random dropout mask edge_keep_mask = torch.rand(self.sources.shape[0], device=x.device) > self.edge_dropout # Apply to edges sources = sources[edge_keep_mask] targets = targets[edge_keep_mask] edge_types = edge_types[edge_keep_mask] # Different dropout for self-loops (only in training mode) if self.training and self.self_loop_dropout is not None: node_keep_mask = torch.rand(x.shape[0], device=x.device) > self.self_loop_dropout else: node_keep_mask = None # fixed edges -> pre-compute weights if self.edge_weighting is not None and sources.numel() > 0: edge_weights = torch.empty_like(sources, dtype=torch.float32) for r in range(edge_types.max().item() + 1): mask = edge_types == r if mask.any(): edge_weights[mask] = self.edge_weighting(sources[mask], targets[mask]) else: edge_weights = None for layer in self.layers: if isinstance(layer, Decomposition): kwargs = dict( node_keep_mask=node_keep_mask, source=sources, target=targets, edge_type=edge_types, edge_weights=edge_weights, ) else: kwargs = dict() x = layer(x, **kwargs) # Cache enriched representations self.enriched_embeddings = x return x def forward( self, indices: Optional[torch.LongTensor] = None, ) -> torch.FloatTensor: """Enrich the entity embeddings of the decoder using R-GCN message propagation.""" x = self._real_forward() if indices is not None: x = x[indices] return x class CompGCNLayer(nn.Module): """A single layer of the CompGCN model.""" def __init__( self, input_dim: int, output_dim: Optional[int] = None, dropout: float = 0.0, use_bias: bool = True, use_relation_bias: bool = False, composition: Hint[CompositionModule] = None, activation: Hint[nn.Module] = nn.Identity, activation_kwargs: Optional[Mapping[str, Any]] = None, edge_weighting: HintType[EdgeWeighting] = SymmetricEdgeWeighting, ): """ Initialize the module. :param input_dim: The input dimension. :param output_dim: The output dimension. If None, equals the input dimension. :param dropout: The dropout to use for forward and backward edges. :param use_bias: # TODO: do we really need this? it comes before a mandatory batch norm layer Whether to use bias. :param use_relation_bias: Whether to use a bias for the relation transformation. :param composition: The composition function. :param activation: The activation to use. :param activation_kwargs: Additional key-word based arguments passed to the activation. """ super().__init__() # normalize output dimension output_dim = output_dim or input_dim # entity-relation composition self.composition = composition_resolver.make(composition) # edge weighting self.edge_weighting: EdgeWeighting = edge_weight_resolver.make(edge_weighting) # message passing weights self.w_loop = nn.Parameter(data=torch.empty(input_dim, output_dim)) self.w_fwd = nn.Parameter(data=torch.empty(input_dim, output_dim)) self.w_bwd = nn.Parameter(data=torch.empty(input_dim, output_dim)) # linear relation transformation self.w_rel = nn.Linear(in_features=input_dim, out_features=output_dim, bias=use_relation_bias) # layer-specific self-loop relation representation self.self_loop = nn.Parameter(data=torch.empty(1, input_dim)) # other components self.drop = nn.Dropout(dropout) self.bn = nn.BatchNorm1d(output_dim) self.bias = Bias(output_dim) if use_bias else None self.activation = activation_resolver.make(query=activation, pos_kwargs=activation_kwargs) # initialize self.reset_parameters() def reset_parameters(self): """Reset the model's parameters.""" for w in ( self.w_loop, self.w_fwd, self.w_bwd, self.self_loop, ): nn.init.xavier_uniform_(w) self.bias.reset_parameters() self.w_rel.reset_parameters() def message( self, x_e: torch.FloatTensor, x_r: torch.FloatTensor, edge_index: torch.LongTensor, edge_type: torch.LongTensor, weight: nn.Parameter, ) -> torch.FloatTensor: """ Perform message passing. :param x_e: shape: (num_entities, input_dim) The entity representations. :param x_r: shape: (2 * num_relations, input_dim) The relation representations (including inverse relations). :param edge_index: shape: (2, num_edges) The edge index, pairs of source and target entity for each triple. :param edge_type: shape (num_edges,) The edge type, i.e., relation ID, for each triple. :param weight: The transformation weight. :return: The updated entity representations. """ # split source, target = edge_index # compose m = self.composition(x_e[source], x_r[edge_type]) # transform m = m @ weight # normalization m = m * self.edge_weighting(source=source, target=target).unsqueeze(dim=-1) # aggregate by sum x_e = x_e.new_zeros(x_e.shape[0], m.shape[1]).index_add(dim=0, index=target, source=m) # dropout x_e = self.drop(x_e) return x_e def forward( self, x_e: torch.FloatTensor, x_r: torch.FloatTensor, edge_index: torch.LongTensor, edge_type: torch.LongTensor, ) -> Tuple[torch.FloatTensor, torch.FloatTensor]: r""" Update entity and relation representations. .. math :: X_E'[e] = \frac{1}{3} \left( X_E W_s + \left( \sum_{h,r,e \in T} \alpha(h, e) \phi(X_E[h], X_R[r]) W_f \right) + \left( \sum_{e,r,t \in T} \alpha(e, t) \phi(X_E[t], X_R[r^{-1}]) W_b \right) \right) :param x_e: shape: (num_entities, input_dim) The entity representations. :param x_r: shape: (2 * num_relations, input_dim) The relation representations (including inverse relations). :param edge_index: shape: (2, num_edges) The edge index, pairs of source and target entity for each triple. :param edge_type: shape (num_edges,) The edge type, i.e., relation ID, for each triple. :return: shape: (num_entities, output_dim) / (2 * num_relations, output_dim) The updated entity and relation representations. """ # prepare for inverse relations edge_type = 2 * edge_type # update entity representations: mean over self-loops / forward edges / backward edges x_e = ( self.composition(x_e, self.self_loop) @ self.w_loop + self.message(x_e=x_e, x_r=x_r, edge_index=edge_index, edge_type=edge_type, weight=self.w_fwd) + self.message(x_e=x_e, x_r=x_r, edge_index=edge_index.flip(0), edge_type=edge_type + 1, weight=self.w_bwd) ) / 3 if self.bias: x_e = self.bias(x_e) x_e = self.bn(x_e) x_e = self.activation(x_e) # Relation transformation x_r = self.w_rel(x_r) return x_e, x_r class CombinedCompGCNRepresentations(nn.Module): """A sequence of CompGCN layers.""" # Buffered enriched entity and relation representations enriched_representations: Optional[Tuple[torch.FloatTensor, torch.FloatTensor]] def __init__( self, *, triples_factory: CoreTriplesFactory, embedding_specification: EmbeddingSpecification, num_layers: Optional[int] = 1, dims: Union[None, int, Sequence[int]] = None, layer_kwargs: Optional[Mapping[str, Any]] = None, ): """ Initialize the combined entity and relation representation module. :param triples_factory: The triples factory containing the training triples. :param embedding_specification: An embedding specification for the base entity and relation representations. :param num_layers: The number of message passing layers to use. If None, will be inferred by len(dims), i.e., requires dims to be a sequence / list. :param dims: The hidden dimensions to use. If None, defaults to the embedding dimension of the base representations. If an integer, is the same for all layers. The last dimension is equal to the output dimension. :param layer_kwargs: Additional key-word based parameters passed to the individual layers; cf. CompGCNLayer. """ super().__init__() # TODO: Check assert triples_factory.create_inverse_triples self.entity_representations = embedding_specification.make( num_embeddings=triples_factory.num_entities, ) self.relation_representations = embedding_specification.make( num_embeddings=2 * triples_factory.real_num_relations, ) input_dim = self.entity_representations.embedding_dim assert self.relation_representations.embedding_dim == input_dim # hidden dimension normalization if dims is None: dims = input_dim if isinstance(dims, int): if num_layers is None: raise ValueError else: dims = [dims] * num_layers if len(dims) != num_layers: raise ValueError( f"The number of provided dimensions ({len(dims)}) must equal the number of layers ({num_layers}).", ) self.output_dim = dims[-1] # Create message passing layers layers = [] for input_dim, output_dim in zip(itertools.chain([input_dim], dims), dims): layers.append(CompGCNLayer( input_dim=input_dim, output_dim=output_dim, **(layer_kwargs or {}), )) self.layers = nn.ModuleList(layers) # register buffers for adjacency matrix; we use the same format as PyTorch Geometric # TODO: This always uses all training triples for message passing self.register_buffer(name="edge_index", tensor=triples_factory.mapped_triples[:, [0, 2]].t()) self.register_buffer(name="edge_type", tensor=triples_factory.mapped_triples[:, 1]) # initialize buffer of enriched representations self.enriched_representations = None def post_parameter_update(self) -> None: # noqa: D102 # invalidate enriched embeddings self.enriched_representations = None def forward( self, ) -> Tuple[torch.FloatTensor, torch.FloatTensor]: """Compute enriched representations.""" if self.enriched_representations is None: x_e = self.entity_representations() x_r = self.relation_representations() # enrich for layer in self.layers: x_e, x_r = layer(x_e=x_e, x_r=x_r, edge_index=self.edge_index, edge_type=self.edge_type) self.enriched_representations = (x_e, x_r) return self.enriched_representations def split(self) -> Tuple["SingleCompGCNRepresentation", "SingleCompGCNRepresentation"]: """Return the separated representations.""" return ( SingleCompGCNRepresentation(self, position=0), SingleCompGCNRepresentation(self, position=1), ) class SingleCompGCNRepresentation(RepresentationModule): """A wrapper around the combined representation module.""" def __init__( self, combined: CombinedCompGCNRepresentations, position: int = 0, ): """ Initialize the module. :param combined: The combined representations. :param position: The position, either 0 for entities, or 1 for relations. """ if position == 0: # entity max_id = combined.entity_representations.max_id shape = (combined.output_dim,) elif position == 1: # relation max_id = combined.relation_representations.max_id shape = (combined.output_dim,) else: raise ValueError super().__init__(max_id=max_id, shape=shape) self.combined = combined self.position = position self.reset_parameters() def forward( self, indices: Optional[torch.LongTensor] = None, ) -> torch.FloatTensor: # noqa: D102 x = self.combined()[self.position] if indices is not None: x = x[indices] return x