Source code for VmaxBuilder.GPR.gpr_preprocessing

"""Generated: validation needed.

Description:
    Reusable preprocessing utilities for parsing GPR rules into independently
    functioning protein (IFP) complexes.
"""

from __future__ import annotations

import re
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from functools import lru_cache
from itertools import product
from typing import Any

from cobra import Model

_GPR_OPERATOR_PATTERN = re.compile(r"\b(and|or)\b|&&|\|\||[|&]", flags=re.IGNORECASE)
_GPR_TOKEN_PATTERN = re.compile(r"\(|\)|\band\b|\bor\b|[^\s()]+", flags=re.IGNORECASE)
_WHITESPACE_PATTERN = re.compile(r"\s+")
_GPR_AND_SPLIT_PATTERN = re.compile(r"\band\b", flags=re.IGNORECASE)
_GPRNode = str | tuple[str, "_GPRNode", "_GPRNode"]


[docs] class TranscriptIfpExpansionOutcome(dict[str, Any]): """Generated: validation needed. Description: Structured outcome of one gene-level IFP transcript expansion attempt. """
@dataclass(frozen=True) class _TokenCursor: """Generated: validation needed. Description: Lightweight cursor for recursive-descent parsing over immutable token tuple. Args: tokens (tuple[str, ...]): Token stream. position (int): Current read index. """ tokens: tuple[str, ...] position: int = 0 def peek(self) -> str | None: """Generated: validation needed. Description: Return next token without consuming it. Returns: str | None: Next token if available. """ if self.position >= len(self.tokens): return None return self.tokens[self.position] def advance(self) -> tuple[str, _TokenCursor]: """Generated: validation needed. Description: Consume one token and return updated cursor. Returns: tuple[str, _TokenCursor]: Consumed token and advanced cursor. Raises: ValueError: When cursor is already at stream end. """ token = self.peek() if token is None: raise ValueError("Unexpected end of GPR expression.") return token, _TokenCursor(tokens=self.tokens, position=self.position + 1)
[docs] @lru_cache(maxsize=4096) def simplify_gpr_rule_cached(gpr_rule: str) -> tuple[tuple[str, ...], ...]: """Generated: validation needed. Description: Parse and expand one GPR rule into unique sorted IFPs. Args: gpr_rule (str): Raw GPR rule. Returns: tuple[tuple[str, ...], ...]: Unique `and` IFPs. Raises: ValueError: When expression is malformed. """ normalised_rule = _normalise_gpr_rule(gpr_rule) token_stream = _tokenise_gpr_rule(normalised_rule) parsed_tree, cursor = _parse_or_expression(_TokenCursor(tokens=token_stream)) if cursor.peek() is not None: raise ValueError(f"Unexpected trailing token in GPR rule: '{cursor.peek()}'.") return _deduplicate_ifps(_expand_tree_to_gene_ifps(parsed_tree))
[docs] def simplify_gpr_rule(gpr_rule: str) -> list[str]: """Generated: validation needed. Description: Convert one raw GPR rule into textual gene-level IFP expressions. Args: gpr_rule (str): Raw GPR rule. Returns: list[str]: Simplified IFP strings. """ return [" and ".join(ifp) for ifp in simplify_gpr_rule_cached(gpr_rule)]
[docs] def build_ifp_mapping_from_gpr_rules(gpr_rules: set[str]) -> dict[str, dict[str, Any]]: """Generated: validation needed. Description: Build per-rule IFP payloads from unique GPR rules. Args: gpr_rules (set[str]): Unique GPR rules. Returns: dict[str, dict[str, Any]]: Per-rule payload with simplified IFPs and counts. """ ifp_mapping: dict[str, dict[str, Any]] = {} for gpr_rule in sorted(gpr_rules): simplified_gene_ifps = simplify_gpr_rule(gpr_rule) ifp_mapping[gpr_rule] = { "simplified_gene_ifps": simplified_gene_ifps, "expansion_count": len(simplified_gene_ifps), } return ifp_mapping
[docs] def build_reaction_ifp_indexes( model: Model, ifp_mapping: Mapping[str, Mapping[str, Any]], ) -> tuple[dict[str, list[str]], dict[str, list[str]]]: """Generated: validation needed. Description: Build bidirectional mapping between reactions and IFPs. Args: model (cobra.Model): Cobra model containing reactions and GPR rules. ifp_mapping (Mapping[str, Mapping[str, Any]]): Per-rule IFP payload. Returns: tuple[dict[str, list[str]], dict[str, list[str]]]: reaction_to_ifps and ifp_to_reactions indexes. """ reaction_to_ifps: dict[str, list[str]] = {} ifp_to_reactions: dict[str, list[str]] = {} for reaction in model.reactions: gpr_rule = reaction.gene_reaction_rule.strip() if not gpr_rule: continue rule_payload = ifp_mapping.get(gpr_rule) if rule_payload is None: continue ifps = [str(ifp) for ifp in rule_payload.get("simplified_gene_ifps", [])] if not ifps: continue unique_ifps = sorted(set(ifps)) reaction_to_ifps[reaction.id] = unique_ifps for ifp in unique_ifps: ifp_to_reactions.setdefault(ifp, []).append(reaction.id) for ifp, reaction_ids in ifp_to_reactions.items(): ifp_to_reactions[ifp] = sorted(set(reaction_ids)) return reaction_to_ifps, ifp_to_reactions
[docs] def build_gene_to_transcripts_mapping( mapping_artifact: Any, ) -> dict[str, tuple[str, ...]]: """Generated: validation needed. Description: Build canonical gene->transcript mapping from scaffold artifacts. Args: mapping_artifact (Any): Either `dict[gene_id, transcripts]` or dataframe-like object with `gene_id` and `transcript_id` columns. Returns: dict[str, tuple[str, ...]]: Canonical mapping where each transcript list is sorted and deduplicated. """ gene_to_transcripts: dict[str, set[str]] = {} if isinstance(mapping_artifact, Mapping): _collect_transcripts_from_mapping(mapping_artifact, gene_to_transcripts) elif hasattr(mapping_artifact, "__getitem__") and hasattr(mapping_artifact, "columns"): _collect_transcripts_from_dataframe_like(mapping_artifact, gene_to_transcripts) return { gene_id: tuple(sorted(transcript_ids)) for gene_id, transcript_ids in gene_to_transcripts.items() if transcript_ids }
[docs] def get_unique_gpr_rules(cobra_model: Model) -> set[str]: """Generated: validation needed. Description: Extract unique non-empty GPR rules from the model. Args: cobra_model (Model): COBRApy model object. Returns: set[str]: Unique GPR rules. """ if cobra_model is None: raise ValueError("Cobra model not found in scaffold artifacts.") gpr_rules: set[str] = set() for reaction in cobra_model.reactions: rule = reaction.gene_reaction_rule.strip() if rule: gpr_rules.add(rule) return gpr_rules
def _collect_transcripts_from_mapping( mapping_artifact: Mapping[Any, Any], gene_to_transcripts: dict[str, set[str]], ) -> None: """Generated: validation needed. Description: Merge dict-like mapping artifact entries into canonical gene->transcript store. Args: mapping_artifact (Mapping[Any, Any]): Dict-like gene->transcript mapping. gene_to_transcripts (dict[str, set[str]]): Mutable accumulator. Modifies: gene_to_transcripts dictionary. """ for gene_id, transcript_values in mapping_artifact.items(): gene_key = str(gene_id) if isinstance(transcript_values, str): transcript_list = [transcript_values] elif isinstance(transcript_values, Sequence): transcript_list = [str(transcript_id) for transcript_id in transcript_values] else: transcript_list = [str(transcript_values)] for transcript_id in transcript_list: if transcript_id: gene_to_transcripts.setdefault(gene_key, set()).add(transcript_id) def _collect_transcripts_from_dataframe_like( mapping_artifact: Any, gene_to_transcripts: dict[str, set[str]], ) -> None: """Generated: validation needed. Description: Merge dataframe-like transcript mapping rows into canonical gene->transcript store. Args: mapping_artifact (Any): Dataframe-like object containing gene/transcript columns. gene_to_transcripts (dict[str, set[str]]): Mutable accumulator. Modifies: gene_to_transcripts dictionary. """ columns = set(mapping_artifact.columns) if not {"gene_id", "transcript_id"}.issubset(columns): return for _, row in mapping_artifact[["gene_id", "transcript_id"]].iterrows(): gene_id = str(row["gene_id"]) transcript_id = str(row["transcript_id"]) if gene_id and transcript_id: gene_to_transcripts.setdefault(gene_id, set()).add(transcript_id)
[docs] def expand_gene_ifp_to_transcript_ifps( gene_ifp: str, gene_to_transcripts: Mapping[str, Sequence[str]], *, maximum_expansion: int, ) -> TranscriptIfpExpansionOutcome: """Generated: validation needed. Description: Expand one gene-level IFP into transcript-level IFPs using gene mappings. Args: gene_ifp (str): Gene-level IFP string joined by `and`. gene_to_transcripts (Mapping[str, Sequence[str]]): Gene->transcript mapping. maximum_expansion (int): Maximum allowed expansion count. Returns: TranscriptIfpExpansionOutcome: Structured expansion result including transcript IFPs, expansion count, threshold flag, and transcript choices. """ gene_tokens = [ gene_token.strip() for gene_token in _GPR_AND_SPLIT_PATTERN.split(gene_ifp) if gene_token.strip() ] if not gene_tokens: return TranscriptIfpExpansionOutcome( transcript_ifps=[], expansion_count=0, exceeded_threshold=False, transcripts_used_by_gene={}, ) transcript_choices: list[tuple[str, ...]] = [] transcripts_used_by_gene: dict[str, tuple[str, ...]] = {} for gene_token in gene_tokens: mapped_transcripts = gene_to_transcripts.get(gene_token) if mapped_transcripts: transcript_group = tuple( str(transcript_id) for transcript_id in mapped_transcripts ) else: # Keep unmapped gene token as fallback to avoid data loss. transcript_group = (gene_token,) transcript_choices.append(transcript_group) transcripts_used_by_gene[gene_token] = transcript_group expansion_count = calculate_ifp_expansion_count(transcript_choices) if expansion_count > maximum_expansion: return TranscriptIfpExpansionOutcome( transcript_ifps=[], expansion_count=expansion_count, exceeded_threshold=True, transcripts_used_by_gene=transcripts_used_by_gene, ) transcript_ifps = { " and ".join(transcript_tuple) for transcript_tuple in product(*transcript_choices) } return TranscriptIfpExpansionOutcome( transcript_ifps=sorted(transcript_ifps), expansion_count=expansion_count, exceeded_threshold=False, transcripts_used_by_gene=transcripts_used_by_gene, )
[docs] def calculate_ifp_expansion_count(transcript_choices: Sequence[Sequence[str]]) -> int: """Generated: validation needed. Description: Calculate transcript expansion count for one gene-level IFP. Args: transcript_choices (Sequence[Sequence[str]]): Replacement options per gene token. Returns: int: Expansion count (`AND` product of replacement counts). """ expansion_count = 1 for choice_group in transcript_choices: expansion_count *= max(len(choice_group), 1) return expansion_count
[docs] def clear_simplification_cache() -> None: """Generated: validation needed. Description: Clear simplification cache for deterministic benchmarks. """ simplify_gpr_rule_cached.cache_clear()
[docs] def get_simplification_cache_info() -> dict[str, int]: """Generated: validation needed. Description: Return simplification cache statistics. Returns: dict[str, int]: Cache statistics dictionary. """ cache_info = simplify_gpr_rule_cached.cache_info() return { "hits": cache_info.hits, "misses": cache_info.misses, "maxsize": cache_info.maxsize or 0, "currsize": cache_info.currsize, }
def _normalise_gpr_rule(gpr_rule: str) -> str: """Generated: validation needed. Description: Canonicalise mixed symbolic/text GPR operators into lowercase words. Args: gpr_rule (str): Raw GPR rule. Returns: str: Normalized GPR string. """ stripped_rule = gpr_rule.strip().replace("∨", " or ").replace("∧", " and ") if not stripped_rule: raise ValueError("GPR rule is empty.") def _replace_operator(match: re.Match[str]) -> str: matched_token = match.group(0).lower() if matched_token in {"or", "|", "||"}: return " or " return " and " normalised_rule = _GPR_OPERATOR_PATTERN.sub(_replace_operator, stripped_rule) normalised_rule = normalised_rule.replace("(", " ( ").replace(")", " ) ") return _WHITESPACE_PATTERN.sub(" ", normalised_rule).strip() def _tokenise_gpr_rule(normalised_gpr_rule: str) -> tuple[str, ...]: """Generated: validation needed. Description: Tokenize normalized GPR text into operators, parentheses, and gene symbols. Args: normalised_gpr_rule (str): Canonicalized GPR rule. Returns: tuple[str, ...]: Ordered token stream. """ tokens = [ token.strip().lower() if token.lower() in {"and", "or"} else token.strip() for token in _GPR_TOKEN_PATTERN.findall(normalised_gpr_rule) if token.strip() ] if not tokens: raise ValueError("No tokens parsed from GPR rule.") return tuple(tokens) def _parse_or_expression(cursor: _TokenCursor) -> tuple[_GPRNode, _TokenCursor]: """Generated: validation needed. Description: Parse `or` precedence level from token stream. Args: cursor (_TokenCursor): Parsing cursor. Returns: tuple[_GPRNode, _TokenCursor]: Parsed node and updated cursor. """ left_node, cursor = _parse_and_expression(cursor) while cursor.peek() == "or": _, cursor = cursor.advance() right_node, cursor = _parse_and_expression(cursor) left_node = ("OR", left_node, right_node) return left_node, cursor def _parse_and_expression(cursor: _TokenCursor) -> tuple[_GPRNode, _TokenCursor]: """Generated: validation needed. Description: Parse `and` precedence level from token stream. Args: cursor (_TokenCursor): Parsing cursor. Returns: tuple[_GPRNode, _TokenCursor]: Parsed node and updated cursor. """ left_node, cursor = _parse_primary_expression(cursor) while cursor.peek() == "and": _, cursor = cursor.advance() right_node, cursor = _parse_primary_expression(cursor) left_node = ("AND", left_node, right_node) return left_node, cursor def _parse_primary_expression(cursor: _TokenCursor) -> tuple[_GPRNode, _TokenCursor]: """Generated: validation needed. Description: Parse one primary expression token (gene or parenthesized subtree). Args: cursor (_TokenCursor): Parsing cursor. Returns: tuple[_GPRNode, _TokenCursor]: Parsed node and updated cursor. Raises: ValueError: When expression has mismatched parentheses or invalid operator placement. """ token = cursor.peek() if token is None: raise ValueError("Unexpected end of GPR expression.") if token == "(": _, cursor = cursor.advance() nested_node, cursor = _parse_or_expression(cursor) closing_token, cursor = cursor.advance() if closing_token != ")": raise ValueError("Unmatched parenthesis in GPR rule.") return nested_node, cursor if token in {"and", "or", ")"}: raise ValueError(f"Invalid token placement in GPR rule: '{token}'.") gene_token, cursor = cursor.advance() return gene_token, cursor def _expand_tree_to_gene_ifps(gpr_tree: _GPRNode) -> tuple[tuple[str, ...], ...]: """Generated: validation needed. Description: Expand parsed GPR tree into `AND` gene IFPs. Args: gpr_tree (_GPRNode): Parsed GPR node. Returns: tuple[tuple[str, ...], ...]: Expanded IFPs. """ if isinstance(gpr_tree, str): return ((gpr_tree,),) operator, left_node, right_node = gpr_tree left_ifps = _expand_tree_to_gene_ifps(left_node) right_ifps = _expand_tree_to_gene_ifps(right_node) if operator == "OR": return left_ifps + right_ifps expanded_ifps: list[tuple[str, ...]] = [] for left_ifp in left_ifps: for right_ifp in right_ifps: expanded_ifps.append(left_ifp + right_ifp) return tuple(expanded_ifps) def _deduplicate_ifps(gene_ifps: Sequence[tuple[str, ...]]) -> tuple[tuple[str, ...], ...]: """Generated: validation needed. Description: Deduplicate repeated genes inside IFPs and repeated IFPs overall. Args: gene_ifps (Sequence[tuple[str, ...]]): Candidate IFPs. Returns: tuple[tuple[str, ...], ...]: Deduplicated IFPs. """ seen_ifps: set[tuple[str, ...]] = set() deduplicated_ifps: list[tuple[str, ...]] = [] for gene_ifp in gene_ifps: unique_gene_order: list[str] = [] seen_genes: set[str] = set() for gene_symbol in gene_ifp: if gene_symbol not in seen_genes: unique_gene_order.append(gene_symbol) seen_genes.add(gene_symbol) canonical_ifp = tuple(unique_gene_order) if canonical_ifp not in seen_ifps: deduplicated_ifps.append(canonical_ifp) seen_ifps.add(canonical_ifp) return tuple(deduplicated_ifps)