Source code for metapub.clinvarvariant

"""metapub.clinvarvariant -- ClinVarVariant class instantiated by supplying ESummary XML string."""

import logging
from datetime import datetime
from typing import Optional, Literal
from dataclasses import dataclass

from lxml import etree

from .base import MetaPubObject
from .exceptions import MetaPubError, BaseXMLError

#TODO: Logging

# See: https://www.ncbi.nlm.nih.gov/clinvar/docs/clinsig/
# All possible clinical significance classes a variant may be classified as
# by a submitter.
# 
# NOTE: here we represent clinical significance classes in lowercase.
ClinSig = Literal[
    "pathogenic", "likely pathogenic", "uncertain significance",
    "likely benign", "benign", "conflicting interpretations",
    "drug response", "risk factor", "association",
    "protective", "other", "likely pathogenic, low penetrance",
    "pathogenic, low penetrance",
    "uncertain risk allele", "likely risk allele",
    "established risk allele", "affects", "conflicting data from submitters",
    "not provided", "vus-high", "vus-mid", "vus-low"
]
# Possible types of IDs a user may supply to initialize a variant.
IdLocations = Literal['clinvar', 'entrez']

[docs] @dataclass class PathogenicSummary: counts: dict[ClinSig, int] total_submitters: int consensus: Optional[ClinSig] conflicting: bool review_status: Optional[str]
[docs] class ClinVarVariant(MetaPubObject):
[docs] def __init__(self, xmlstr, *args, **kwargs): # Try new VCV format first, fall back to old format for backwards compatibility try: # Parse the full XML document first to determine format from lxml import etree dom = etree.fromstring(xmlstr) if dom.tag == 'ClinVarResult-Set': # New VCV format self._is_vcv_format = True super(ClinVarVariant, self).__init__(xmlstr, None, args, kwargs) # Parse full document self.variation_archive = self.content.find('VariationArchive') if self.variation_archive is None: # Check if this is an empty result set (invalid ID) set_elem = self.content.find('set') if set_elem is not None and len(set_elem) == 0: raise BaseXMLError('Empty XML document') # This will trigger the "Invalid ClinVar Variation ID" error else: raise BaseXMLError('No VariationArchive found in VCV format') else: # Old format self._is_vcv_format = False super(ClinVarVariant, self).__init__(xmlstr, 'VariationReport', args, kwargs) self.variation_archive = None except (etree.XMLSyntaxError, BaseXMLError) as e: # If XML parsing fails completely, let it bubble up raise BaseXMLError('Invalid XML document: %s' % str(e)) if self.content is None: raise BaseXMLError('Empty XML document') if self._get('error'): raise MetaPubError('Supplied XML for ClinVarVariant contained explicit error: %s' % self._get('error')) # VariationReport basic details self.variation_id = self._get_variation_id() self.variation_name = self._get_variation_name() self.variation_type = self._get_variation_type() self.date_created = self._get_date_created() self.date_last_updated = self._get_date_last_updated() self.submitter_count = self._get_submitter_count() # Species Info self.species = self._get_species() self.taxonomy_id = self._get_taxonomy_id() # Gene List self.genes = self._get_gene_list() # Allele Info self.cytogenic_location = self._get_cytogenic_location() self.sequence_locations = self._get_sequence_locations() self.hgvs = self._get_hgvs_list() self.xrefs = self._get_xref_list() self.molecular_consequences = self._get_molecular_consequence_list() self.allele_frequencies = self._get_allele_frequency_list() # Clinical significance and classifications (new in VCV format) self.clinical_significance = self._get_clinical_significance() self.review_status = self._get_review_status() self.date_last_evaluated = self._get_date_last_evaluated() self.number_of_submissions = self._get_number_of_submissions() self.number_of_submitters = self._get_number_of_submitters() self.pathogenic_summary = self._get_pathogenic_summary() # VCV record metadata (new in VCV format) self.vcv_accession = self._get_vcv_accession() self.record_type = self._get_record_type() self.most_recent_submission = self._get_most_recent_submission() # Associated conditions/diseases (new in VCV format) self.associated_conditions = self._get_associated_conditions() # Enhanced molecular consequences (new in VCV format) self.molecular_consequences_detailed = self._get_molecular_consequences_detailed() # Enhanced sequence details (new in VCV format) self.sequence_details = self._get_sequence_details() # Enhanced gene information (new in VCV format) self.gene_dosage_info = self._get_gene_dosage_info() # Protein change summary (new in VCV format) self.protein_change = self._get_protein_change() # Clinical assertions (new in VCV format) self.clinical_assertions = self._get_clinical_assertions() # Enhanced citations (new in VCV format) self.citations = self._get_citations()
# Observations
[docs] def to_dict(self): """ returns a dictionary composed of all extractable properties of this concept. """ outd = self.__dict__.copy() outd.pop('content') return outd
### HGVS string convenience properties def _get_hgvs_or_empty_list(self, hgvsdict): try: # Check if this is old format if 'AccessionVersion' in hgvsdict and 'Change' in hgvsdict: accession = hgvsdict['AccessionVersion'] change = hgvsdict['Change'] return [accession + ':' + change] # Check if this is new VCV format with separate protein change elif 'ProteinAccessionVersion' in hgvsdict and 'ProteinChange' in hgvsdict: accession = hgvsdict['ProteinAccessionVersion'] change = hgvsdict['ProteinChange'] return [accession + ':' + change] else: return [] except KeyError: # example of missing Change: ClinVar ID 409 # example of missing AccessionVersion: ClinVar ID 11344 return [] @property def hgvs_c(self): """ Returns a list of all coding HGVS strings from the Allelle data. """ strlist = [] for hgvsdict in self.hgvs: if hgvsdict['Type'].find('coding') > -1: strlist = strlist + self._get_hgvs_or_empty_list(hgvsdict) return strlist @property def hgvs_g(self): """ Returns a list of all genomic HGVS strings from the Allelle data. """ strlist = [] for hgvsdict in self.hgvs: if hgvsdict['Type'].find('genomic') > -1: strlist = strlist + self._get_hgvs_or_empty_list(hgvsdict) return strlist @property def hgvs_p(self): """ Returns a list of all protein effect HGVS strings from the Allelle data. """ strlist = [] for hgvsdict in self.hgvs: # Look for dedicated protein type entries if hgvsdict['Type'].find('protein') > -1: strlist = strlist + self._get_hgvs_or_empty_list(hgvsdict) # Also look for protein changes in coding entries elif hgvsdict['Type'].find('coding') > -1 and 'ProteinAccessionVersion' in hgvsdict: try: accession = hgvsdict['ProteinAccessionVersion'] change = hgvsdict['ProteinChange'] strlist.append(accession + ':' + change) except KeyError: pass return strlist ### VariationReport basic info def _get_variation_id(self): if self._is_vcv_format: return self.variation_archive.get('VariationID') else: return self.content.get('VariationID') def _get_variation_name(self): if self._is_vcv_format: return self.variation_archive.get('VariationName') else: return self.content.get('VariationName') def _get_variation_type(self): if self._is_vcv_format: return self.variation_archive.get('VariationType') else: return self.content.get('VariationType') def _get_date_created(self): if self._is_vcv_format: datestr = self.variation_archive.get('DateCreated') else: datestr = self.content.get('DateCreated') if datestr: return datetime.strptime(datestr, '%Y-%m-%d') else: return None def _get_date_last_updated(self): if self._is_vcv_format: datestr = self.variation_archive.get('DateLastUpdated') else: datestr = self.content.get('DateLastUpdated') if datestr: return datetime.strptime(datestr, '%Y-%m-%d') else: return None def _get_submitter_count(self): if self._is_vcv_format: count_attr = self.variation_archive.get('NumberOfSubmitters') else: count_attr = self.content.get('SubmitterCount') try: return int(count_attr) if count_attr else None except (TypeError, ValueError): return None def _get_species(self): if self._is_vcv_format: species_elem = self.variation_archive.find('Species') return species_elem.text if species_elem is not None else None else: return self._get('Species') def _get_taxonomy_id(self): if self._is_vcv_format: # In VCV format, taxonomy ID is not typically provided in the same way return None else: species_elem = self.content.find('Species') return species_elem.get('TaxonomyId') if species_elem is not None else None #### GENE LIST def _get_gene_list(self): """ Returns a list of dictionaries representing each gene associated with this variant. Keys in gene dictionary vary by format but include: 'Symbol', 'FullName', 'GeneID', 'HGNC_ID', etc. """ genes = [] if self._is_vcv_format: # In VCV format: VariationArchive/ClassifiedRecord/SimpleAllele/GeneList/Gene simple_allele = self.variation_archive.find('ClassifiedRecord/SimpleAllele') if simple_allele is not None: genelist = simple_allele.find('GeneList') if genelist is not None: for gene_elem in genelist.findall('Gene'): gene_dict = dict(gene_elem.items()) # Also capture OMIM and Property elements as text omim_elem = gene_elem.find('OMIM') if omim_elem is not None: gene_dict['OMIM'] = omim_elem.text property_elem = gene_elem.find('Property') if property_elem is not None: gene_dict['Property'] = property_elem.text genes.append(gene_dict) else: # Old format: VariationReport/GeneList/Gene genelist = self.content.find('GeneList') if genelist is not None: for gene_elem in genelist.getchildren(): genes.append(dict(gene_elem.items())) return genes ### ALLELE INFORMATION def _get_allele_id(self): if self._is_vcv_format: simple_allele = self.variation_archive.find('ClassifiedRecord/SimpleAllele') return simple_allele.get('AlleleID') if simple_allele is not None else None else: allele_elem = self.content.find('Allele') return allele_elem.get('AlleleID') if allele_elem is not None else None def _get_cytogenic_location(self): if self._is_vcv_format: simple_allele = self.variation_archive.find('ClassifiedRecord/SimpleAllele') if simple_allele is not None: location = simple_allele.find('Location/CytogeneticLocation') return location.text if location is not None else None else: return self._get('Allele/CytogeneticLocation') return None def _get_sequence_locations(self): seqlocs = [] if self._is_vcv_format: simple_allele = self.variation_archive.find('ClassifiedRecord/SimpleAllele') if simple_allele is not None: for elem in simple_allele.findall('Location/SequenceLocation'): seqlocs.append(dict(elem.items())) else: for elem in self.content.findall('Allele/SequenceLocation'): seqlocs.append(dict(elem.items())) return seqlocs def _get_hgvs_list(self): hgvs = [] if self._is_vcv_format: # In VCV format: VariationArchive/ClassifiedRecord/SimpleAllele/HGVSlist/HGVS simple_allele = self.variation_archive.find('ClassifiedRecord/SimpleAllele') if simple_allele is not None: hgvs_list = simple_allele.find('HGVSlist') if hgvs_list is not None: for hgvs_elem in hgvs_list.findall('HGVS'): hgvs_dict = dict(hgvs_elem.items()) # Get NucleotideExpression details nuc_expr = hgvs_elem.find('NucleotideExpression') if nuc_expr is not None: hgvs_dict['AccessionVersion'] = nuc_expr.get('sequenceAccessionVersion', '') hgvs_dict['Change'] = nuc_expr.get('change', '') # Get ProteinExpression details if available prot_expr = hgvs_elem.find('ProteinExpression') if prot_expr is not None: hgvs_dict['ProteinAccessionVersion'] = prot_expr.get('sequenceAccessionVersion', '') hgvs_dict['ProteinChange'] = prot_expr.get('change', '') hgvs.append(hgvs_dict) else: # Old format: VariationReport/Allele/HGVSlist try: for elem in self.content.find('Allele/HGVSlist').getchildren(): hgvs.append(dict(elem.items())) except AttributeError: return [] return hgvs def _get_xref_list(self): xrefs = [] if self._is_vcv_format: simple_allele = self.variation_archive.find('ClassifiedRecord/SimpleAllele') if simple_allele is not None: xref_list = simple_allele.find('XRefList') if xref_list is not None: for elem in xref_list.getchildren(): xrefs.append(dict(elem.items())) else: xref_list = self.content.find('Allele/XRefList') if xref_list is not None: for elem in xref_list.getchildren(): xrefs.append(dict(elem.items())) return xrefs def _get_molecular_consequence_list(self): molcons = [] if self._is_vcv_format: simple_allele = self.variation_archive.find('ClassifiedRecord/SimpleAllele') if simple_allele is not None: molcon_list = simple_allele.find('MolecularConsequenceList') if molcon_list is not None: for elem in molcon_list.getchildren(): molcons.append(dict(elem.items())) else: try: molcon_list = self.content.find('Allele/MolecularConsequenceList') if molcon_list is not None: for elem in molcon_list.getchildren(): molcons.append(dict(elem.items())) except AttributeError: return [] return molcons def _get_allele_frequency_list(self): freqs = [] if self._is_vcv_format: simple_allele = self.variation_archive.find('ClassifiedRecord/SimpleAllele') if simple_allele is not None: freq_list = simple_allele.find('AlleleFrequencyList') if freq_list is not None: for elem in freq_list.getchildren(): freqs.append(dict(elem.items())) else: try: freq_list = self.content.find('Allele/AlleleFrequencyList') if freq_list is not None: for elem in freq_list.getchildren(): freqs.append(dict(elem.items())) except AttributeError: return [] return freqs ### NEW VCV FORMAT ENHANCEMENTS ### def _get_clinical_significance(self) -> Optional[ClinSig]: """Get the clinical significance classification (e.g., 'pathogenic', 'benign') A list of all significance classes is available here: https://www.ncbi.nlm.nih.gov/clinvar/docs/clinsig/ **Note**: in this version of Metapub, clinical significance is represented in lowercase. Older versions did NOT do this, so make sure to update your code if necessary! """ if not self._is_vcv_format: return None # Look in Classifications/GermlineClassification/Description classifications = self.variation_archive.find('.//Classifications') if classifications is not None: germline_class = classifications.find('GermlineClassification') if germline_class is not None: desc_elem = germline_class.find('Description') return (desc_elem.text).lower() if desc_elem is not None else None return None def _get_review_status(self): """Get the review status (e.g., 'criteria provided, multiple submitters, no conflicts')""" if not self._is_vcv_format: return None classifications = self.variation_archive.find('.//Classifications') if classifications is not None: germline_class = classifications.find('GermlineClassification') if germline_class is not None: review_elem = germline_class.find('ReviewStatus') return review_elem.text if review_elem is not None else None return None def _get_date_last_evaluated(self): """Get the date when the clinical significance was last evaluated""" if not self._is_vcv_format: return None classifications = self.variation_archive.find('.//Classifications') if classifications is not None: germline_class = classifications.find('GermlineClassification') if germline_class is not None: date_str = germline_class.get('DateLastEvaluated') if date_str: try: return datetime.strptime(date_str, '%Y-%m-%d') except ValueError: return None return None def _get_number_of_submissions(self): """Get the number of submissions for this variant""" if not self._is_vcv_format: return None num_str = self.variation_archive.get('NumberOfSubmissions') try: return int(num_str) if num_str else None except (ValueError, TypeError): return None def _get_number_of_submitters(self): """Get the number of submitters for this variant""" if not self._is_vcv_format: return None num_str = self.variation_archive.get('NumberOfSubmitters') try: return int(num_str) if num_str else None except (ValueError, TypeError): return None def _get_vcv_accession(self): """Get the VCV accession number (e.g., 'VCV000012397')""" if not self._is_vcv_format: return None return self.variation_archive.get('Accession') def _get_record_type(self): """Get the record type (e.g., 'classified')""" if not self._is_vcv_format: return None return self.variation_archive.get('RecordType') def _get_most_recent_submission(self): """Get the date of the most recent submission""" if not self._is_vcv_format: return None date_str = self.variation_archive.get('MostRecentSubmission') if date_str: try: return datetime.strptime(date_str, '%Y-%m-%d') except ValueError: return None return None def _get_associated_conditions(self): """Get list of associated conditions/diseases with their MedGen IDs""" if not self._is_vcv_format: return [] conditions = [] rcv_list = self.variation_archive.find('.//RCVList') if rcv_list is not None: for rcv in rcv_list.findall('RCVAccession'): condition_list = rcv.find('ClassifiedConditionList') if condition_list is not None: for condition in condition_list.findall('ClassifiedCondition'): cond_dict = { 'name': condition.text, 'medgen_id': condition.get('ID'), 'database': condition.get('DB'), 'rcv_accession': rcv.get('Accession'), 'rcv_title': rcv.get('Title') } # Avoid duplicates if cond_dict not in conditions: conditions.append(cond_dict) return conditions def _get_molecular_consequences_detailed(self): """Get detailed molecular consequences with Sequence Ontology terms""" if not self._is_vcv_format: return [] consequences = [] simple_allele = self.variation_archive.find('ClassifiedRecord/SimpleAllele') if simple_allele is not None: hgvs_list = simple_allele.find('HGVSlist') if hgvs_list is not None: for hgvs_elem in hgvs_list.findall('HGVS'): for mol_cons in hgvs_elem.findall('MolecularConsequence'): consequence = { 'type': mol_cons.get('Type'), 'so_id': mol_cons.get('ID'), 'database': mol_cons.get('DB') } if consequence not in consequences: consequences.append(consequence) return consequences def _get_sequence_details(self): """Get enhanced sequence location details including VCF coordinates""" if not self._is_vcv_format: return [] details = [] simple_allele = self.variation_archive.find('ClassifiedRecord/SimpleAllele') if simple_allele is not None: location = simple_allele.find('Location') if location is not None: for seq_loc in location.findall('SequenceLocation'): detail = dict(seq_loc.items()) # Convert numeric fields for field in ['start', 'stop', 'display_start', 'display_stop', 'variantLength', 'positionVCF']: if field in detail: try: detail[field] = int(detail[field]) except (ValueError, TypeError): pass details.append(detail) return details def _get_pathogenic_summary(self) -> Optional[PathogenicSummary]: """ Return the aggregation of per-submitter clinical germline significance classifications into a readable summary. Returns a dataclass in the following format: { counts: { 'pathogenic': 3, 'likely pathogenic': 1, 'uncertain significance': 0, } ... total_submitters: 4, consensus: 'pathogenic', conflicting': False, review_status: 'criteria provided, multiple submitters, no conflicts' } """ if not self._is_vcv_format: return None counts: dict[ClinSig, int] = {} total = 0 assertion_list = self.variation_archive.find(".//ClinicalAssertionList") if assertion_list is not None: for assertion in assertion_list.findall('ClinicalAssertion'): # TODO: ContributesToAggregateClassification doesn't seem to be inside ClinicalAssertion. classification_info = assertion.find("Classification") if classification_info is None: continue # TODO: support OncogenicityClassification or SomaticClinicalImpact germline = classification_info.find("GermlineClassification") if germline is not None: cs = germline.text if not cs: continue key = cs.strip().lower() # if a classification was not provided, skip if key == "not provided": continue counts[key] = counts.get(key, 0) + 1 total += 1 # Determine consensus consensus = None if counts: max_count = max(counts.values()) top_counts: list[ClinSig] = [k for k, v in counts.items() if v == max_count] if len(top_counts) == 1: consensus = top_counts[0] conflicting = "conflicting" in (self.clinical_significance or "").lower() return PathogenicSummary( counts=counts, total_submitters=total, consensus=consensus if not conflicting else None, conflicting=conflicting, review_status=self.review_status ) def _get_gene_dosage_info(self): """Get gene dosage sensitivity information""" if not self._is_vcv_format: return [] dosage_info = [] simple_allele = self.variation_archive.find('ClassifiedRecord/SimpleAllele') if simple_allele is not None: gene_list = simple_allele.find('GeneList') if gene_list is not None: for gene in gene_list.findall('Gene'): gene_dosage = {'symbol': gene.get('Symbol')} haplo_elem = gene.find('Haploinsufficiency') if haplo_elem is not None: gene_dosage['haploinsufficiency'] = { 'classification': haplo_elem.text, 'last_evaluated': haplo_elem.get('last_evaluated'), 'clingen_url': haplo_elem.get('ClinGen') } triplo_elem = gene.find('Triplosensitivity') if triplo_elem is not None: gene_dosage['triplosensitivity'] = { 'classification': triplo_elem.text, 'last_evaluated': triplo_elem.get('last_evaluated'), 'clingen_url': triplo_elem.get('ClinGen') } if len(gene_dosage) > 1: # Only add if we have dosage info dosage_info.append(gene_dosage) return dosage_info def _get_protein_change(self): """Get the simple protein change notation (e.g., 'R611Q')""" if not self._is_vcv_format: return None simple_allele = self.variation_archive.find('ClassifiedRecord/SimpleAllele') if simple_allele is not None: protein_change = simple_allele.find('ProteinChange') return protein_change.text if protein_change is not None else None return None def _get_citations(self): """Get citation information from the clinical classifications""" if not self._is_vcv_format: return [] citations = [] classifications = self.variation_archive.find('.//Classifications') if classifications is not None: germline_class = classifications.find('GermlineClassification') if germline_class is not None: for citation in germline_class.findall('Citation'): citation_info = { 'type': citation.get('Type'), 'ids': [] } for id_elem in citation.findall('ID'): citation_info['ids'].append({ 'source': id_elem.get('Source'), 'id': id_elem.text }) # Also check for URLs url_elem = citation.find('URL') if url_elem is not None: citation_info['url'] = url_elem.text citations.append(citation_info) return citations def _get_clinical_assertions(self): """Get individual clinical assertions from submitters""" if not self._is_vcv_format: return [] assertions = [] assertion_list = self.variation_archive.find('.//ClinicalAssertionList') if assertion_list is not None: for assertion in assertion_list.findall('ClinicalAssertion'): assertion_info = { 'id': assertion.get('ID'), 'submission_date': assertion.get('SubmissionDate'), 'date_created': assertion.get('DateCreated'), 'date_last_updated': assertion.get('DateLastUpdated') } # Get submitter information clinvar_accession = assertion.find('ClinVarAccession') if clinvar_accession is not None: assertion_info.update({ 'accession': clinvar_accession.get('Accession'), 'submitter_name': clinvar_accession.get('SubmitterName'), 'organization_category': clinvar_accession.get('OrganizationCategory') }) # Get classification classification = assertion.find('Classification') if classification is not None: assertion_info['classification'] = {} review_status = classification.find('ReviewStatus') if review_status is not None: assertion_info['classification']['review_status'] = review_status.text germline_class = classification.find('GermlineClassification') if germline_class is not None: assertion_info['classification']['clinical_significance'] = germline_class.text date_evaluated = classification.get('DateLastEvaluated') if date_evaluated: assertion_info['classification']['date_last_evaluated'] = date_evaluated # Get observed data observed_list = assertion.find('ObservedInList') if observed_list is not None: assertion_info['observed_in'] = [] for observed in observed_list.findall('ObservedIn'): obs_info = {} sample = observed.find('Sample') if sample is not None: obs_info['sample'] = { 'origin': sample.find('Origin').text if sample.find('Origin') is not None else None, 'species': sample.find('Species').text if sample.find('Species') is not None else None, 'affected_status': sample.find('AffectedStatus').text if sample.find('AffectedStatus') is not None else None } # Number tested num_tested = sample.find('NumberTested') if num_tested is not None: try: obs_info['sample']['number_tested'] = int(num_tested.text) except (ValueError, TypeError): obs_info['sample']['number_tested'] = num_tested.text method = observed.find('Method') if method is not None: method_type = method.find('MethodType') if method_type is not None: obs_info['method_type'] = method_type.text assertion_info['observed_in'].append(obs_info) assertions.append(assertion_info) return assertions
### OBSERVATIONS