# -*- coding: utf-8 -*- import sys import re def GC(buf): gc = buf.upper().count('G') + buf.upper().count('C') all = buf.upper().count('A') + buf.upper().count('T') + gc return (gc / all * 100) def len_align(buf): """ нужно передавать CIGAR """ let = re.split('[0-9]+', buf) let.remove("") num = re.split('[MIDSH]{1}', buf) num.remove("") num = list(map(int, num)) length = 0 length_read = 0 for i in range(len(let)): if (let[i]=="M") or (let[i]=="I"): length+=num[i] length_read+=num[i] elif ((let[i]=="S") or (let[i]=="H")): length_read+=num[i] return (length, length_read) def info (buf, arg = "I"): """ Передавать CIGAR """ let = re.split('[0-9]+', buf) let.remove("") num = re.split('[MIDSH]{1}', buf) num.remove("") num = list(map(int, num)) count = 0 for i in range(len(let)): if (let[i]==arg): count+=num[i] return count def info_insert (buf): """ Передавать CIGAR, возращает позиции инсерции, включая левый символ и не включая правый символ """ let = re.split('[0-9]+', buf) let.remove("") num = re.split('[MIDSH]{1}', buf) num.remove("") num = list(map(int, num)) start = 0 intervals = [] for i in range(len(let)): if (let[i]=="I"): intervals.append([start, start+num[i]]) if (let[i]=="D") or (let[i]=="H"): continue start+=num[i] return tuple(intervals) def removal (buf): """ Передавать CIGAR, возращает позиции удаления, включая левый символ и не включая правый символ """ let = re.split('[0-9]+', buf) let.remove("") num = re.split('[MIDSH]{1}', buf) num.remove("") num = list(map(int, num)) start = 0 intervals = [] for i in range(len(let)): if (let[i]=="I") or (let[i]=="S"): intervals.append([start, start+num[i]]) if (let[i]=="D") or (let[i]=="H"): continue start+=num[i] return tuple(intervals) def type_snp_text (seq, buf, cigar): positions_ins = info_insert(cigar) inserts_text = '' inserts = [] for i in positions_ins: inserts.append((seq[i[0]:i[1]], i[0])) inserts_text = inserts_text + str(seq[i[0]:i[1]]) + "," + str(i[0]) + '|' inserts_text = inserts_text[:-1] gc_per = GC(seq) positions = removal(cigar) for i in positions: if (i[1]<(len(seq))): seq = seq[:i[0]] + "_"*(i[1]-i[0]) + seq[i[1]:] else: seq = seq[:i[0]] + "_"*(i[1]-i[0]) seq = seq.replace("_", "") buf = buf.replace("MD:Z:", "") types = {"A>C":0, "A>G":0, "A>T":0, "C>A":0, "C>G":0, "C>T":0, "G>A":0, "G>C":0, "G>T":0, "T>A":0, "T>C":0, "T>G":0} cycles_text = '' dels_text = '' cycles = [] dels = [] let = re.split('[0-9]+', buf) let.pop(0) num = re.split('[ATGC^]+', buf) num = list(map(int, num)) iter = 0 for i in range(len(num)): iter+=num[i] if (let[i] == '') or (let[i] == '\n'): continue if (let[i][0] == '^'): dels_text = dels_text + str(let[i][1:]) + "," + str(iter) + "|" dels.append((let[i][1:], iter)) continue if (seq[iter]=="A") or (seq[iter]=="C") or (seq[iter]=="G") or (seq[iter]=="T"): key = let[i] + ">" + seq[iter] types[key]+=1 cycles.append(iter) cycles_text = cycles_text + str(iter) + "," iter+=1 else: iter+=1 l = len_align(cigar) dels_text = dels_text[:-1] cycles_text = cycles_text[:-1] if len(dels)!=0: len_del = sum([len(dels[i][0]) for i in range(len(dels))])/len(dels) else: len_del = 0 if len(inserts)!=0: len_ins = sum([len(inserts[i][0]) for i in range(len(inserts))])/len(inserts) else: len_ins = 0 result = {"SNP": types, "Cycles": cycles_text, "Dels": dels_text, "Inserts" : inserts_text, "GC": gc_per, "Count_SNPs": sum(types.values()), "Mean_length_del": len_del, \ "Mean_length_insert": len_ins, "Read_length": l[1], "Align_length": l[0]} return result with open(sys.argv[3]+"custom_"+sys.argv[1]+"_mapq20_table.txt", 'w') as table: header = "ID;Read;Count_SNPs;Cycles;GC;Mean_length_del;Mean_length_insert;Read_length;Align_length;Dels;Inserts;A>C;A>G;A>T;C>A;C>G;C>T;G>A;G>C;G>T;T>A;T>C;T>G\n" table.write(header) with open(sys.argv[2]+"custom_"+sys.argv[1]+".txt", 'r') as inp: for line in inp: columns = line.split("\t") if int(columns[4])>20: id = columns[0] flag = columns[1] binary = "{0:b}".format(int(flag)) if (binary[-1]=='0'): typ = 0 elif len(binary)==7: typ = 1 elif len(binary)>=8: if (binary[-7]=='0') and (binary[-8]=='1'): typ = 2 else: typ = -1 else: typ = -1 cigar = columns[5] md = columns[12] seq = columns[9] inf = type_snp_text(seq, md, cigar) stroke = id + ";" + str(typ) + ";" + str(inf['Count_SNPs']) + ";" + inf['Cycles'] + ";" + str(inf['GC']) + ";" + str(inf['Mean_length_del']) + ";" + str(inf['Mean_length_insert']) + ";" + str(inf['Read_length']) + ";" + str(inf['Align_length']) + ";" + inf['Dels'] + ";" + inf['Inserts'] + ";" + \ str(inf['SNP']['A>C']) + ";" + str(inf['SNP']['A>G']) + ";" + str(inf['SNP']['A>T']) + ";" + str(inf['SNP']['C>A']) + ";" + str(inf['SNP']['C>G']) + ";" + str(inf['SNP']['C>T']) + ";" + str(inf['SNP']['G>A']) + ";" + str(inf['SNP']['G>C']) + ";" + str(inf['SNP']['G>T']) + ";" + str(inf['SNP']['T>A']) + ";" + str(inf['SNP']['T>C']) + ";" + str(inf['SNP']['T>G']) + '\n' table.write(stroke)