Source code for anchorna.util

# (C) 2024, Tom Eulenfeld, MIT license
"""
`.Fluke`, `.Anchor` and `.AnchorList` classes providing useful attributes and methods
"""

import collections
from copy import deepcopy
import logging
from statistics import median
from warnings import warn

from sugar.core.meta import Attr
from sugar.data import submat

log = logging.getLogger('anchorna')


[docs] def corrscore(seq1, seq2, gap='-', sm=None): """Similarity score between two words""" if sm is None: from sugar.data import submat sm = submat('blosum62') return sum(sm[nt1][nt2] for nt1, nt2 in zip(seq1, seq2) if nt1 != gap and nt2 != gap)
[docs] class Fluke(Attr): """ A fluke is a word position on a single sequence and part of an `Anchor` Properties: id, seqid, score, median_score, start, stop, offset, word, poor. """ @property def len(self): return self.stop - self.start @property def strand(self): return self.get('strand', '+')
[docs] def _apply_mode(self, mode): """ Transform index, allowed modes are ``'aa', 'cds', 'nt'`` """ fluke = self assert mode in ('aa', 'cds', 'nt') o = fluke.offset i1 = fluke.start i2 = fluke.stop assert i1 < i2 if mode in ('aa', None): return i1, i2 elif mode == 'cds': return 3 * i1, 3 * i2 elif mode == 'nt' and fluke.strand == '-': return o - 3 * i2, o - 3 * i1 else: return o + 3 * i1, o + 3 * i2
[docs] class Anchor(collections.UserList): """ A single Anchor is a list of `Flukes <Fluke>`, one for each sequence Some properties: data (list of flukes), id, gseqid, guide, minscore (aka score), maxscore, medscore. """ def __init__(self, data=None, **kw): super().__init__(data) for k, v in kw.items(): setattr(self, k, v) def __str__(self): return self.tostr() @property def id(self): return f'A {self.guide.start}+{self.guide.len}' @property def strand(self): strands = {f.strand for f in self} assert len(strands) <= 1 if len(strands) == 1: return strands.pop() @strand.setter def strand(self, value): for f in self: f.strand = value def __hash__(self): return hash((self.guide.start, self.guide.len, self.guide.offset))
[docs] def todict_seqid(self): return {f.seqid: f for f in self}
@property def sid(self): return self.todict_seqid() @property def seqids(self): return [f.seqid for f in self]
[docs] def tostr(self, i='', verbose=False, mode='aa'): start, stop = self.guide._apply_mode(mode) poor = sum(f.poor for f in self) out = f'A{i} {start}+{stop-start} minscore {self.minscore} poor {poor} {self.guide.word}' if not verbose: return out flukes = [f' F{j} {f._apply_mode(mode)[0]} medscore {f.median_score} {f.word} {f.seqid}' + ' (poor)' * f.poor for j, f in enumerate(self)] return '\n'.join([out]+flukes)
[docs] def sort(self, key=None, **kw): if key is None: def key(f): return (False, '') if f.seqid == self.gseqid else (f.poor, f.seqid) self.data = sorted(self.data, key=key, **kw) return self
@property def guide(self): return self.sid[self.gseqid] @property def minscore(self): return min(f.median_score for f in self) @property def maxscore(self): return max(f.median_score for f in self) @property def medscore(self): return median(f.median_score for f in self) @property def poor_flukes(self): data = [f for f in self if f.poor] return Anchor(data, gseqid=self.gseqid) @property def good_flukes(self): data = [f for f in self if not f.poor] return Anchor(data, gseqid=self.gseqid)
[docs] def nicely_overlaps_with(self, a2): a1 = self.good_flukes a2 = a2.good_flukes return (max(a1.guide.start, a2.guide.start) <= min(a1.guide.stop, a2.guide.stop) and set(a1.seqids) == set(a2.seqids) and all((f1.start - f2.start == a1.guide.start - a2.guide.start and f1.poor == f2.poor) for f1, f2 in zip(a1.sort(), a2.sort())))
[docs] def join_with(self, a2, scoring=None): a1 = self if not a1.nicely_overlaps_with(a2): raise ValueError('Cannot join anchors which do not overlap') if a1.guide.start <= a2.guide.start and a1.guide.stop >= a2.guide.stop: # a2 is contained in a1 return a1 elif a1.guide.start >= a2.guide.start and a1.guide.stop <= a2.guide.stop: # a1 is contained in a2 return a2 else: # overlap flukes = [] correctl = None for f1, f2 in zip(a1.sort(), a2.sort()): assert f1.seqid == f2.seqid assert f1.poor == f2.poor assert f1.offset == f2.offset if f1.start > f2.start: f1, f2 = f2, f1 start = f1.start stop = f2.stop l = stop - start overlaplen = f1.len + f2.len - l if not f1.poor: if correctl is None: correctl = l assert correctl == l if correctl != l: # there is a gap between the flukes! # that might only be the case for "poor" flukes below the threshold # adapt the word assert f1.poor and f2.poor l = correctl overlaplen = f1.len + f2.len - l if f1.score >= f2.score: stop = start + l else: start = stop - l nword = f1.word + f2.word[overlaplen:] if f1.start <= f2.start else f2.word + f1.word[overlaplen:] assert len(nword) == l fluke = Fluke(seqid=f1.seqid, score=None, median_score=None, start=start, stop=stop, offset=f1.offset, word=nword, poor=f1.poor) flukes.append(fluke) anchor = Anchor(flukes, gseqid=a1.gseqid) if scoring: anchor._calculate_fluke_scores(scoring) return anchor
[docs] def _calculate_fluke_scores(self, scoring): words = {fluke.word for fluke in self if not fluke.poor} for fluke in self: scores = [corrscore(fluke.word, w, sm=submat(scoring)) for w in words] fluke.score = max(scores) fluke.median_score = median(scores)
[docs] def contradicts(self, a2, aggressive=True): a1 = self if a1.guide.start > a2.guide.start: a1, a2 = a2, a1 a1 = a1.good_flukes a2 = a2.good_flukes attr = 'stop' if aggressive else 'start' return not all(getattr(f1, attr) <= f2.start for f1, f2 in zip(a1.sort(), a2.sort()))
[docs] def _convert_nt(self): for fluke in self: start, stop = fluke._apply_mode('nt') fluke.start, fluke.stop, fluke.offset = start, stop, 0 fluke.strand = '+'
[docs] class AnchorList(collections.UserList): """ Collection of `Anchors <Anchor>` with useful methods """ def __init__(self, data=None, no_cds=False): super().__init__(data) if no_cds: self._no_cds = True def __str__(self): return self.tostr()
[docs] def _repr_pretty_(self, p, cycle): if cycle: p.text('...') else: p.text(str(self))
@property def no_cds(self): return getattr(self, '_no_cds', False) @no_cds.setter def no_cds(self, value): if value: self._no_cds = True else: raise ValueError('Not allowed to unset no_cds')
[docs] def tostr(self, verbose=False, mode='aa'): return '\n'.join(a.tostr(i=i, verbose=verbose, mode=mode) for i, a in enumerate(self))
[docs] def copy(self): return deepcopy(self)
[docs] def sort(self, key=lambda a: a.guide.start, **kw): self.data = sorted(self, key=key, **kw) return self
[docs] def merge_overlapping_anchors(self, scoring=None): """ Remove overlapping anchors, step B of ``anchorna go`` """ self.sort() already_merged = set() ndata = [] for i, a1 in enumerate(self.data): if a1 in already_merged: continue for a2 in self.data[i+1:]: if a2 in already_merged: continue if a1.guide.stop < a2.guide.start: break if a1.nicely_overlaps_with(a2): a1 = a1.join_with(a2, scoring=scoring) already_merged.add(a2) ndata.append(a1) self.data = ndata return self
[docs] def remove_contradicting_anchors(self, aggressive=True): """ Remove contradicting anchors, step C of ``anchorna go`` """ # The runtime of this method can be enhanced by using an interval tree or a nested containment list. # But this method is not the bottleneck at all. anchors = sorted(self, key=lambda a: a.minscore, reverse=True) remove_anchors = set() for i, a1 in enumerate(anchors): if a1 in remove_anchors: continue for a2 in anchors[i+1:]: if a2 in remove_anchors: continue assert a1 != a2 and a1.minscore >= a2.minscore if a1.contradicts(a2, aggressive=aggressive): log.debug(f'Remove anchor {a2.guide.start}+{a2.guide.len} with min score {a2.minscore}, ' f'keep anchor {a1.guide.start}+{a1.guide.len} with min score {a1.minscore}') remove_anchors.add(a2) self.data = [anchor for anchor in self if anchor not in remove_anchors] return AnchorList(remove_anchors, no_cds=self.no_cds).sort()
[docs] def convert2fts(self, **kw): """ Convert anchors to `~sugar.core.fts.FeatureList` object, see `.anchors2fts()` """ return anchors2fts(self, **kw)
[docs] def _convert_nt(self): for anchor in self: anchor._convert_nt() self.no_cds = True
[docs] def write(self, fname, **kw): """ Write anchors to GFF file, see `.write_anchors()` """ from anchorna.io import write_anchors return write_anchors(self, fname, **kw)
[docs] def anchors2fts(anchors, mode=None): """ Convert anchors to `~sugar.core.fts.FeatureList` object Write some important attributes to ``Feature.meta._gff`` metadata. :param mode: Optionally convert anchors to ``'nt'`` or ``'cds'`` indices """ from sugar import Feature, FeatureList fts = [] for i, a in enumerate(anchors): for j, f in enumerate(a.sort()): # fluke if a.gseqid == f.seqid: assert j == 0 ftype = 'anchor' name = f'A{i}' else: assert j > 0 ftype = 'fluke' name=f'A{i}_{f.seqid}' start, stop = f._apply_mode(mode) ft = Feature(ftype, start=start, stop=stop, strand=f.strand, meta=dict(name=name, seqid=f.seqid, score=f.score, _gff=Attr(source='anchorna', word=f.word, median_score=f.median_score))) if hasattr(f, 'offset'): ft.meta._gff.offset = f.offset if f.poor: ft.meta._gff.poor = 1 fts.append(ft) return FeatureList(fts)
[docs] def fts2anchors(fts, no_cds=False): """ Convert `~sugar.core.fts.FeatureList` object to anchors Read some important attributes from ``Feature.meta._gff`` metadata. """ anchors = [] flukes = [] gseqid = None for ft in fts: if ft.type == 'anchor': if len(flukes) > 0: anchors.append(Anchor(flukes, gseqid=gseqid)) flukes = [] gseqid = ft.seqid aname = ft.meta.name if ft.type in ('anchor', 'fluke'): if not ft.meta.name.startswith(aname): raise ValueError(f'Fluke with name {ft.meta.name} not part of anchor {aname}') start, stop = ft.locs.range fluke = Fluke(seqid=ft.seqid, score=ft.meta.score, start=start, stop=stop, strand=str(ft.loc.strand), word=ft.meta._gff.word, poor=hasattr(ft.meta._gff, 'poor'), median_score = float(ft.meta._gff.get('median_score', 1))) if hasattr(ft.meta._gff, 'offset'): fluke.offset = ft.meta._gff.offset flukes.append(fluke) else: warn(f'Cannot convert feature of type {ft.type}') if len(flukes) > 0: anchors.append(Anchor(flukes, gseqid=gseqid)) return AnchorList(anchors, no_cds=no_cds)