optimization_of_DNA-library.../info_pol_mapq_union.py
2023-12-18 14:31:34 +03:00

171 lines
5.5 KiB
Python

# -*- coding: utf-8 -*-
import sys
import re
def GC(buf):
gc = buf.upper().count('G') + buf.upper().count('C')
all = buf.upper().count('A') + buf.upper().count('T') + gc
return (gc / all * 100)
def len_align(buf):
"""
нужно передавать CIGAR
"""
let = re.split('[0-9]+', buf)
let.remove("")
num = re.split('[MIDSH]{1}', buf)
num.remove("")
num = list(map(int, num))
length = 0
length_read = 0
for i in range(len(let)):
if (let[i]=="M") or (let[i]=="I"):
length+=num[i]
length_read+=num[i]
elif ((let[i]=="S") or (let[i]=="H")):
length_read+=num[i]
return (length, length_read)
def info (buf, arg = "I"):
"""
Передавать CIGAR
"""
let = re.split('[0-9]+', buf)
let.remove("")
num = re.split('[MIDSH]{1}', buf)
num.remove("")
num = list(map(int, num))
count = 0
for i in range(len(let)):
if (let[i]==arg):
count+=num[i]
return count
def info_insert (buf):
"""
Передавать CIGAR, возращает позиции инсерции, включая левый символ и не включая правый символ
"""
let = re.split('[0-9]+', buf)
let.remove("")
num = re.split('[MIDSH]{1}', buf)
num.remove("")
num = list(map(int, num))
start = 0
intervals = []
for i in range(len(let)):
if (let[i]=="I"):
intervals.append([start, start+num[i]])
if (let[i]=="D") or (let[i]=="H"):
continue
start+=num[i]
return tuple(intervals)
def removal (buf):
"""
Передавать CIGAR, возращает позиции удаления, включая левый символ и не включая правый символ
"""
let = re.split('[0-9]+', buf)
let.remove("")
num = re.split('[MIDSH]{1}', buf)
num.remove("")
num = list(map(int, num))
start = 0
intervals = []
for i in range(len(let)):
if (let[i]=="I") or (let[i]=="S"):
intervals.append([start, start+num[i]])
if (let[i]=="D") or (let[i]=="H"):
continue
start+=num[i]
return tuple(intervals)
def type_snp_text (seq, buf, cigar):
positions_ins = info_insert(cigar)
inserts_text = ''
inserts = []
for i in positions_ins:
inserts.append((seq[i[0]:i[1]], i[0]))
inserts_text = inserts_text + str(seq[i[0]:i[1]]) + "," + str(i[0]) + '|'
inserts_text = inserts_text[:-1]
gc_per = GC(seq)
positions = removal(cigar)
for i in positions:
if (i[1]<(len(seq))):
seq = seq[:i[0]] + "_"*(i[1]-i[0]) + seq[i[1]:]
else:
seq = seq[:i[0]] + "_"*(i[1]-i[0])
seq = seq.replace("_", "")
buf = buf.replace("MD:Z:", "")
types = {"A>C":0, "A>G":0, "A>T":0, "C>A":0, "C>G":0, "C>T":0, "G>A":0, "G>C":0, "G>T":0, "T>A":0, "T>C":0, "T>G":0}
cycles_text = ''
dels_text = ''
cycles = []
dels = []
let = re.split('[0-9]+', buf)
let.pop(0)
num = re.split('[ATGC^]+', buf)
num = list(map(int, num))
iter = 0
for i in range(len(num)):
iter+=num[i]
if (let[i] == '') or (let[i] == '\n'):
continue
if (let[i][0] == '^'):
dels_text = dels_text + str(let[i][1:]) + "," + str(iter) + "|"
dels.append((let[i][1:], iter))
continue
if (seq[iter]=="A") or (seq[iter]=="C") or (seq[iter]=="G") or (seq[iter]=="T"):
key = let[i] + ">" + seq[iter]
types[key]+=1
cycles.append(iter)
cycles_text = cycles_text + str(iter) + ","
iter+=1
else:
iter+=1
l = len_align(cigar)
dels_text = dels_text[:-1]
cycles_text = cycles_text[:-1]
if len(dels)!=0:
len_del = sum([len(dels[i][0]) for i in range(len(dels))])/len(dels)
else:
len_del = 0
if len(inserts)!=0:
len_ins = sum([len(inserts[i][0]) for i in range(len(inserts))])/len(inserts)
else:
len_ins = 0
result = {"SNP": types, "Cycles": cycles_text, "Dels": dels_text, "Inserts" : inserts_text, "GC": gc_per, "Count_SNPs": sum(types.values()), "Mean_length_del": len_del, \
"Mean_length_insert": len_ins, "Read_length": l[1], "Align_length": l[0]}
return result
with open(sys.argv[3]+"custom_"+sys.argv[1]+"_mapq20_table.txt", 'w') as table:
header = "ID;Read;Count_SNPs;Cycles;GC;Mean_length_del;Mean_length_insert;Read_length;Align_length;Dels;Inserts;A>C;A>G;A>T;C>A;C>G;C>T;G>A;G>C;G>T;T>A;T>C;T>G\n"
table.write(header)
with open(sys.argv[2]+"custom_"+sys.argv[1]+".txt", 'r') as inp:
for line in inp:
columns = line.split("\t")
if int(columns[4])>20:
id = columns[0]
flag = columns[1]
binary = "{0:b}".format(int(flag))
if (binary[-1]=='0'):
typ = 0
elif len(binary)==7:
typ = 1
elif len(binary)>=8:
if (binary[-7]=='0') and (binary[-8]=='1'):
typ = 2
else:
typ = -1
else:
typ = -1
cigar = columns[5]
md = columns[12]
seq = columns[9]
inf = type_snp_text(seq, md, cigar)
stroke = id + ";" + str(typ) + ";" + str(inf['Count_SNPs']) + ";" + inf['Cycles'] + ";" + str(inf['GC']) + ";" + str(inf['Mean_length_del']) + ";" + str(inf['Mean_length_insert']) + ";" + str(inf['Read_length']) + ";" + str(inf['Align_length']) + ";" + inf['Dels'] + ";" + inf['Inserts'] + ";" + \
str(inf['SNP']['A>C']) + ";" + str(inf['SNP']['A>G']) + ";" + str(inf['SNP']['A>T']) + ";" + str(inf['SNP']['C>A']) + ";" + str(inf['SNP']['C>G']) + ";" + str(inf['SNP']['C>T']) + ";" + str(inf['SNP']['G>A']) + ";" + str(inf['SNP']['G>C']) + ";" + str(inf['SNP']['G>T']) + ";" + str(inf['SNP']['T>A']) + ";" + str(inf['SNP']['T>C']) + ";" + str(inf['SNP']['T>G']) + '\n'
table.write(stroke)