Source code for anchorna.io

# (C) 2024, Tom Eulenfeld, MIT license
"""
IO routines, see `.write_anchors()` and `.read_anchors()`
"""

import json
import logging
import sys

import matplotlib.colors as mcolors
from matplotlib.colors import to_hex, to_rgb
from sugar import read_fts

from anchorna.util import fts2anchors, Anchor, AnchorList, Fluke


log = logging.getLogger('anchorna')


[docs] def write_anchors(anchors, fname, mode=None): """ Write anchors to GFF file Offsets are stored as comments. :param str mode: if specified, transform indices and export to GFF file without the offsets stored as comments. The result is a file which should not be read in again with anchorna. """ from anchorna import __version__ fts = anchors.convert2fts(mode=mode or 'aa') offsets = {ft.seqid: ft.meta._gff.pop('offset') for ft in fts} offsets_header = ''.join(f'#offset {seqid} {offset}\n' for seqid, offset in offsets.items()) if mode is None: header_cds = '#no_cds\n' if anchors.no_cds else ( '# Indices are given for amino acids, the offset specifies the offset of index 0 from this file\n' '# to the beginning of the original sequence (in nucleotides).\n' ) header = ( f'#AnchoRNA anchor file\n' f'# written with AnchoRNA v{__version__}\n' + header_cds + offsets_header) else: header = f'# Anchors exported by AnchoRNA v{__version__} with mode {mode}\n' if fname is None: try: print(fts.tofmtstr('gff', header_sugar=False, header=header)) except BrokenPipeError: pass else: fts.write(fname, 'gff', header_sugar=False, header=header)
[docs] def _read_anchors(fname, check_header=True): """ Read anchors from GFF file Offsets are restored from comments. """ comments = [] fts = read_fts(fname, 'gff', comments=comments) offsets = {} no_cds = False for i, line in enumerate(comments): if i == 0: if check_header and not line.startswith('##gff-version 3'): raise IOError(f'{fname} not a valid GFF file') elif i == 1: if check_header and not line.startswith('#AnchoRNA'): raise IOError(f'{fname} not a valid anchor file') elif line.startswith('#no_cds'): no_cds = True elif line.startswith('#offset'): seqid, offset = line.split()[1:] offsets[seqid] = int(offset) elif line.startswith('#'): continue for ft in fts: ft.meta._gff.offset = offsets.get(ft.seqid) return fts2anchors(fts, no_cds=no_cds)
[docs] def _parse_selection(anchors, selection): """ Parse string specifying anchors and select these See ``anchorna combine -h`` """ anchors2 = anchors[:0] for part in selection.split(','): if ':' in part: i, j = part.split(':') i = int(i.strip().removeprefix('a')) j = int(j.strip().removeprefix('a')) else: i = int(part.strip().removeprefix('a')) j = i + 1 anchors2.extend(anchors[i:j]) return anchors2
[docs] def read_anchors(fname, check_header=True): """ Read anchors from GFF file Offsets are restored from comments. Additionally, anchors can be selected and/or removed with a special syntax, see ``anchorna combine -h`` """ fname = str(fname) if '|' not in fname: return _read_anchors(fname, check_header=check_header) fname, selection = fname.split('|', 1) anchors = _read_anchors(fname.strip(), check_header=check_header) selection = selection.lower() if '|' not in selection: return _parse_selection(anchors, selection) selection, rem = selection.split('|') anchors_remove = set(_parse_selection(anchors, rem)) anchors = _parse_selection(anchors, selection) if selection.strip() else anchors anchors.data = [a for a in anchors if a not in anchors_remove] return anchors
[docs] class _AnchorJSONEncoder(json.JSONEncoder):
[docs] def default(self, o): if isinstance(o, (Anchor, AnchorList, Fluke)): obj = {k: v for k, v in o.__dict__.items()} obj['_cls'] = type(o).__name__ return obj else: # Let the base class default method raise the TypeError return json.JSONEncoder.default(self, o)
[docs] def _json_hook(d): if cls := d.pop('_cls', None): return globals()[cls](**d) else: return d
[docs] def load_json(fname): """ Load anchors from JSON file, experimental """ if fname in ('-', None): return json.loads(sys.stdin.read(), object_hook=_json_hook) else: with open(fname) as f: return json.load(f, object_hook=_json_hook)
[docs] def write_json(obj, fname): """ Write anchors to JSON file, experimental """ if fname in ('-', None): print(json.dumps(obj, cls=_AnchorJSONEncoder)) else: with open(fname, 'w') as f: json.dump(obj, f, cls=_AnchorJSONEncoder)
[docs] def export_dialign(anchors, seqids, mode='aa', score_use_fluke=None): """ Export anchors to Dialign anchor file """ assert mode in ('nt', 'cds', 'aa') sortkey_score = lambda f: f.score sortkey_ids = lambda f: seqids.index(f.seqid) content = [] for anchor in anchors: f0 = anchor.sort(key=sortkey_score)[-1] start0 = f0._apply_mode(mode)[0] i = sortkey_ids(f0) anchor.sort(key=sortkey_ids) for f in anchor: j = sortkey_ids(f) if (f == f0 or score_use_fluke is not None and f.score < score_use_fluke): continue assert f.len == f0.len start, stop = f._apply_mode(mode) content.append( f'{i+1} {j+1} {start0+1} {start+1} {stop-start} {f.score}\n' ) return ''.join(content)
[docs] def _make_rgb_transparent(rgb, bg_rgb, alpha): return [alpha * c1 + (1 - alpha) * c2 for (c1, c2) in zip(to_rgb(rgb), to_rgb(bg_rgb))]
[docs] def export_jalview(anchors, mode='aa', score_use_fluke=None): """ Export anchors to Jalview feature file """ assert mode in ('nt', 'cds', 'aa') cols = list(mcolors.TABLEAU_COLORS.values()) anchors = sorted(anchors, key=lambda a: a.guide.start) content = [] header = [] for k, a in enumerate(anchors): poor = sum(f.poor for f in a) for j, f in enumerate(a): if score_use_fluke is not None and f.score < score_use_fluke: continue # anchor2 C_CSFV_KC533775 -1 130 150 anchorsim 1.0 # anchor100 D_BDV_NC_003679 -1 10 20 anchorxx w = a.guide.len score_ = max(1, f.median_score) / a.maxscore c = to_hex(_make_rgb_transparent(cols[k % len(cols)], 'white', score_)).strip('#') al = f'anchor{k}_s{f.median_score}' header.append( f'{al}\t{c}\n' ) i, j = f._apply_mode(mode) content.append(f'{f.word[:5]} w{w} poor:{poor}\t{f.seqid}\t-1\t{i+1}\t{j}\t{al}\n') header.append('\nSTARTFILTERS\nENDFILTERS\n\n') return ''.join(header) + ''.join(content)
[docs] def export_locarna(anchors, mode='nt', score_use_fluke=None): """ Export anchors to 4 column bed files usable with ``mlocarna ----anchor-constraints`` """ assert mode in ('nt', 'cds', 'aa') anchors = sorted(anchors, key=lambda a: a.guide.start) content = [] for k, a in enumerate(anchors): for j, f in enumerate(a): if score_use_fluke is not None and f.score < score_use_fluke: continue # A 10 16 first_box # B 8 14 first_box # A 39 42 ACA-box # B 25 28 ACA-box i, j = f._apply_mode(mode) content.append( f'{f.seqid}\t{i}\t{j-1}\tA{k}\n' ) return ''.join(content)
[docs] def export_stockholm(anchors, seqs, mode='nt', score_use_fluke=None, gap='-.'): """ Export anchors to a Stockholm GC line """ from sugar._io.stockholm import fts2row assert mode in ('nt', 'cds', 'aa') fts = anchors.convert2fts(mode=mode) for ft in fts: ft.name = ft.name.split('_')[0] fts_for_export = [] for name, fts_single_anchor in fts.groupby('name').items(): starts = set() stops = set() for ft in fts_single_anchor: # switch from sequence locations to alignment locations try: ind = seqs.d[ft.seqid].slindex(gap=gap)[ft] except KeyError: from warnings import warn warn(f'Seq with id {ft.seqid} not present') continue starts.add(ind.start) stops.add(ind.stop) if len(starts) != 1 or len(stops) != 1: msg = (f'Anchor {name} is not aligned in this alignment, ' 'please check the alignment with anchorna view, ' 'or remove conflicting anchors with anchorna combine.') raise ValueError(msg) ft.loc.start = starts.pop() ft.loc.stop = stops.pop() fts_for_export.append(ft) row = fts2row(fts.__class__(fts_for_export), lensec=min(len(seq) for seq in seqs)) seqs.meta.setdefault('_stockholm').setdefault('GC').AnchoRNA = row return seqs