""" This is a work-in-progress script for merging DeComposition Corpus (DCC) with the psd files from the Penn-Parsed-Corpora of Historical English as well as the York-Toronto-Helsinki Corpus of Old English Prose. The script is meant to accompany the DCC corpus files (.csv). It """ __author__ = "Martin Kopf" __copyright__ = "2025, Decomposing Decomposition in Time" __version__ = ".1" __maintainer__ = "Martin Kopf" __email__ = "martin.kopf@uni-saarland.de" __status__ = "development" import pandas as pd import os import re def get_directories(): """ Ask user for all their directories. Create path for output if it does not exist. If .csv file doesn't exist or path to psd-files don't exist at path provided, FileNotFoundError will occur at reading attempt. """ tsv_path = '01_tsv' # Ask user for item of interest: ioi = int(input("Please choose which DCC-ITEM you would like to pass the alignment script.\n" "1 = AGAIN\n" "2 = EFT\n" "3 = RE-VERBS\n" "4 = ALMOST\n")) # Ask user for corpus of interest: coi = int(input("Please choose which CORPUS you would like to pass the alignment script.\n" "1 = YCOE\n" "2 = PPCME2\n" "3 = PPCEME\n" "4 = PPCMBE\n")) # ioi = 2 # coi = 1 do_item_of_interest_to_path = { 1 : 'DCC_AGAIN.csv', 2 : 'DCC_EFT.csv', 3 : 'DCC_RE-verbs.csv', 4 : 'DCC_ALMOST.csv', } do_corpus_of_interest_to_path = { 1 : 'psd_YCOE', 2 : 'psd_PPCME', 3 : 'psd_PPCEME', 4 : 'psd_PPCMBE', # 1 : 'test_psd_YCOE', # 2 : 'test_psd_PPCME', # 3 : 'test_psd_PPCEME', # 4 : 'test_psd_PPCMBE' } path_to_tsv = os.path.join(tsv_path, do_item_of_interest_to_path[int(ioi)]) path_to_psd_files = do_corpus_of_interest_to_path[int(coi)] # Make something like this as output directory: 'psd_DC_YCOE' path_to_psd_OUT = path_to_psd_files.replace('psd_', 'psd_DCC_') if not os.path.exists(path_to_psd_OUT): print(f"The output directory {path_to_psd_OUT} does not exist. Creating.") os.makedirs(path_to_psd_OUT) print(f"Aligning {path_to_tsv} with {path_to_psd_files}.\n") return path_to_tsv, do_corpus_of_interest_to_path[coi], path_to_psd_OUT def read_annotations_as_df(path_to_tsv): """Here we read the annotations in the .csv file as pandas dataframe.""" df_anno = pd.read_csv(path_to_tsv, sep='\t').fillna('') return df_anno def make_numbered_psd(psd_unit_in): """ Add a number (followed by '§' as a separator) to every node; '(NP-SBJ (PRO they))' --> '(105§NP-SBJ (106§PRO they))'. In case a parse is being fed repeatedly after adding nodes, this is needs to be sensitive to existing numbers. :param psd_unit_in: :return psd_unit_out: """ # no_present_numbers = len(re.findall("\(\d+?§", psd_unit_in)) # node_number = no_present_numbers psd_unit_in = re.sub('\(\d+?§', '(', psd_unit_in) node_number = 0 list_of_labels = re.findall("\((.*?)\s", psd_unit_in) psd_unit_out = psd_unit_in[:] while len(re.findall("§", psd_unit_out)) < len(list_of_labels): mo = re.search("\(([^§]+?)\s", psd_unit_out) node_number += 1 auf = mo.span()[0] aus = mo.span()[1] new_label = str(node_number) + "§" + psd_unit_out[auf+1:aus-1] psd_unit_out = psd_unit_out[:auf+1] + new_label + psd_unit_out[aus-1:] return psd_unit_out def read_all_psd_files(path): """ Read all psd files from a directory. :param path: :return: all the psd files as one long stretch of test and a list of all the psd-files in the directory. """ all_psd_text = '' lo_files = [f for f in sorted(os.listdir(path)) if f.endswith('.psd')] for psd_f in lo_files: with open(os.path.join(path, psd_f), 'r') as f: all_psd_text += f"({psd_f} BOF)\n" all_psd_text += f.read() all_psd_text += f"({psd_f} EOF)\n" return all_psd_text, lo_files def split_psd_string(psd_string_file): # split the psd content from a file into smaller bits; # i.e. one bit/psd-unit per corpus token; return as list of psd-units; # do this by counting up/down for open/close parenthesis characters; # start by collecting indices first: list_of_psd_unit_spans = list() counter = 0 for idx, char in enumerate(psd_string_file): if char == "(" and counter == 0: start = idx counter += 1 continue elif char == "(" and counter != 0: counter += 1 continue elif char == ")" and counter != 1: counter -= 1 continue elif char == ")" and counter == 1: counter -= 1 end = idx + 1 list_of_psd_unit_spans.append((start, end)) continue else: continue # Once the index-spans are all collected, iterate through them to extract the strings; # He we can start calling these psd-units -- they each represent one corpus-token: list_of_psd_units_all = list() for idx, span in enumerate(list_of_psd_unit_spans): # dict_psd_units[idx + 1] = psd_string_file[span[0]:span[1]] list_of_psd_units_all.append(psd_string_file[span[0]:span[1]]) return list_of_psd_units_all def get_dict_corpus_idee_to_psd_unit(lo_corpusID, lo_psd_token): """ Make a dictionary with all relevant PSD-units from an independent instance of a treebank/corpus (.psd-file(s)). The keys will be the dict-keys, the remaining PSD-strings will be the dict-values. Assumption: There are no white space between the bare corpus ID and the closing parenthesis immediately following the corpus ID. If your corpus has these white spaces, switch to the regex-search version. """ N = len(lo_psd_token) do_ID_psd_token = {} for n, psd_token in enumerate(lo_psd_token): if n % 20000 == 0 and n > 0: print(f"progress: {round(n*100/N, 1)}%") for corpusID in lo_corpusID: if corpusID+')' in psd_token: do_ID_psd_token[corpusID] = psd_token return do_ID_psd_token def reduce_annoDataframe_to_revelant_annotations(df_anno, do_ID_psd_token): """ This is the 'sister function' to get_dict_corpus_idee_to_psd_unit(). It'll reduce the annotations dataframe to the only those corpus IDs that are in the corpus data provided by the user. """ do_ID_psd_token = list(do_ID_psd_token.keys()) df_anno = df_anno[df_anno['corpusID'].isin(do_ID_psd_token)] return df_anno def process_one_anno(targetID, corpusID, l_object, position, reading, token, psd_token): psd_token_nrd = make_numbered_psd(psd_token) nrd_terminals = re.findall('\([^()]+?\)', psd_token_nrd) do_idx_nrdTerminal = {} do_idx_node_numbrs = {} # Filter out terminal nodes based on 'Empty Categories' which were removed for querying and annotation; # by removing them, we can (better) align the surface (non-empty) object language items with the items # in the semantic annotation string; # (cf. https://www.ling.upenn.edu/ppche/ppche-release-2016/annotation/syn-empty.htm) lo_nrd_terminals = [] for t in nrd_terminals: # if corpusID == 'PENNY-E3-P2,165.128' or corpusID == 'MTUDOR-1510-E1-P1,1.1,118.19': # print(t) if t.endswith(' *)'): continue elif re.search(' 0\)', t): continue elif re.search(' \*\S*?\)', t): continue elif re.search('CODE ', t): continue elif re.search(', ', t): continue elif re.search(' \.\.\.\)', t): continue # lo_nrd_terminals.append(t) # elif re.search('\.\.\.', t): # lo_nrd_terminals.append(t) elif re.search('\. ', t): continue elif re.search('" ', t): continue elif re.search("' ", t): continue elif re.search("\{\S+?\}", t): continue else: lo_nrd_terminals.append(t) for ix, terminal in enumerate(lo_nrd_terminals): try: terminal_item = terminal[1:-1].split(' ')[1] except IndexError as e: print(f"{targetID}: {corpusID} -- (lemma: ><) - {token}\n" f"terminal: {terminal}\n" f"{psd_token}\n\n") do_idx_nrdTerminal[ix] = terminal_item do_idx_node_numbrs[ix] = terminal[1:-1].split(' ')[0].split('§')[0] continue try: mo = re.search(f"\({do_idx_node_numbrs[position]}§\S+? \S+?\)", psd_token_nrd) # fix_this except KeyError as e: print(f"{e}\n" f"targetID: {targetID},\ncorpusID:{corpusID},\n" f"l_object: {l_object}, at position: {position},\n" f"reading: {reading}, token: {token},\n" f"psd_token_nrd: {psd_token_nrd}\n" f"lo_nrd_terminals: {lo_nrd_terminals}\n" f"do_idx_nrdTerminal: {do_idx_nrdTerminal}\n" f"do_idx_node_numbrs: {do_idx_node_numbrs}\n" ) # return psd_token psd_stub_left = psd_token_nrd[:mo.span()[0]] my_psd_nugget = psd_token_nrd[mo.span()[0]:mo.span()[1]] psd_stub_right = psd_token_nrd[mo.span()[1]:] def make_DC_string_1(nrd_label, targetID, reading, anno_type, position, item): return f"{nrd_label} (DC{targetID}:{reading}:{anno_type}:{position} {item})" anno_as_additional_parenthesis_around_terminal_object_lang_item = False if anno_as_additional_parenthesis_around_terminal_object_lang_item: nrd_label, item = my_psd_nugget.split() if l_object.replace('_', '').startswith('*'): anno_type = 'PRD' # antecedent material out_psd_nugget = make_DC_string_1(nrd_label, targetID, reading, anno_type, position, item) elif l_object.replace('*', '').startswith('_'): anno_type = 'ANT' # predicate holding main verb out_psd_nugget = make_DC_string_1(nrd_label, targetID, reading, anno_type, position, item) else: anno_type = 'TRG' # trigger out_psd_nugget = make_DC_string_1(nrd_label, targetID, reading, anno_type, position, item) def make_DC_string_2(nrd_label, targetID, reading, anno_type, position, item): return f"{nrd_label} {item}@DC{targetID}:{reading}:{anno_type}:{position})" suffix_to_object_lang_item_with_at_sign = False if suffix_to_object_lang_item_with_at_sign: nrd_label, item = my_psd_nugget.split() item = item[:-1] if l_object.replace('_', '').startswith('*'): anno_type = 'PRD' # antecedent material out_psd_nugget = make_DC_string_2(nrd_label, targetID, reading, anno_type, position, item) elif l_object.replace('*', '').startswith('_'): anno_type = 'ANT' # predicate holding main verb out_psd_nugget = make_DC_string_2(nrd_label, targetID, reading, anno_type, position, item) else: anno_type = 'TRG' # trigger out_psd_nugget = make_DC_string_2(nrd_label, targetID, reading, anno_type, position, item) def make_DC_string_3(nrd_label, targetID, reading, anno_type, position, item): return f"({nrd_label}@DC{targetID}:{reading}:{anno_type}:{position} {item})" suffix_to_pos = True if suffix_to_pos: nrd_label, item = my_psd_nugget[1:-1].split() if l_object.replace('_', '').startswith('*'): anno_type = 'PRD' # antecedent material out_psd_nugget = make_DC_string_3(nrd_label, targetID, reading, anno_type, position, item) elif l_object.replace('*', '').startswith('_'): anno_type = 'ANT' # predicate holding main verb out_psd_nugget = make_DC_string_3(nrd_label, targetID, reading, anno_type, position, item) else: anno_type = 'TRG' # trigger out_psd_nugget = make_DC_string_3(nrd_label, targetID, reading, anno_type, position, item) def make_DC_string_4(targetID, reading, anno_type, position, my_psd_nugget): return f"(DC{targetID}:{reading}:{anno_type}:{position} {my_psd_nugget})" onTopOf_terminals = False # works if onTopOf_terminals: if l_object.replace('_', '').startswith('*'): anno_type = 'PRD' # antecedent material out_psd_nugget = make_DC_string_4(targetID, reading, anno_type, position, my_psd_nugget) # print(out_psd_nugget) elif l_object.replace('*', '').startswith('_'): anno_type = 'ANT' # predicate holding main verb out_psd_nugget = make_DC_string_4(targetID, reading, anno_type, position, my_psd_nugget) # print(out_psd_nugget) else: anno_type = 'TRG' # trigger out_psd_nugget = make_DC_string_4(targetID, reading, anno_type, position, my_psd_nugget) # print(out_psd_nugget) psd_token_out = psd_stub_left + out_psd_nugget + psd_stub_right if l_object.replace('+', '').replace('*', '').replace('_', '').lower() == re.sub('@.*?$', '', do_idx_nrdTerminal[position]).replace('+', '').replace('*', '').replace('_', '').lower().replace('-', '').replace("'", ''): # print(f"\n {targetID, corpusID, l_object, position, reading, givenness}\n" # f" -> match: {l_object} == {do_idx_nrdTerminal[position]}\n" # f"\nnrd Terms: \n{list(do_idx_nrdTerminal.values())}\n" # f" {token}\n" # f" {psd_token}\n" # f" {psd_token_nrd}\n" # f" {do_idx_node_numbrs}\n" # f" {lo_nrd_terminals}\n" # f" {do_idx_nrdTerminal}" # f" {psd_token_out}\n" # ) pass else: print(f" {targetID, corpusID, l_object, position, reading}\n" f"mismatch: {l_object} != {do_idx_nrdTerminal[position]}\n" f"\ntoken: \n{token}\n" # f" \n{psd_token}\n" # f"\n \n{psd_token_nrd}\n" f"\ndo_ix_nrdT:\n{do_idx_nrdTerminal}\n" f"\nnrd Terms: \n{list(do_idx_nrdTerminal.values())}\n" f"\ndo ix-nrN: \n{do_idx_node_numbrs}\n" f"\nlo_nrdTerm:\n{lo_nrd_terminals}\n" # f" {psd_token_out}\n" ) psd_token_out_c = psd_token_out.replace('\t', '').replace('\n', '') # print(f"done with {corpusID}: {psd_token_out_c}") return psd_token_out def align_annos_with_psd(df_anno, lo_psd_token): """ Merge all annotations into the PSD-units they belong to. As input we have (i.) a dataframe with the annotations and (ii.) a list of all psd tokens(/units). :param df_anno: pandas DF with 1 annotation (any type) per row. :param lo_psd_token: list of psd tokens. :return lo_psd_token_out: list of psd tokens that have the annotations 'built in'. """ # Get dict of ID to relevant psd tokens (see function description for details): do_ID_rel_psd_tokens = get_dict_corpus_idee_to_psd_unit(df_anno.corpusID.unique(), lo_psd_token) # Reduce the annotations dataframe (df_anno) to contain only relevant annotations: df_anno = reduce_annoDataframe_to_revelant_annotations(df_anno, do_ID_rel_psd_tokens) # Iterate over every row in the dataframe with all the annotations. On each apply 'process_one_anno'. # In addition to all the anno-stuff, pass the psd-unit that is required for this particular annotation. # The function will return an updated psd-unit and overwrite the dictionary. print("Aligning annotations with psd.") # drop_cols = ['givenness', 'iterative', 'elliptical', 'mod_lemma'] for i in list(df_anno.index): # print(f"{i}, " # f"targetID {df_anno.targetID[i]}, " # f"corpusID {df_anno.corpusID[i]}, " # f"position {df_anno.position[i]}, " # f"{do_ID_rel_psd_tokens[df_anno.corpusID[i]]}" # f"") do_ID_rel_psd_tokens[df_anno.corpusID[i]] = process_one_anno( df_anno.targetID[i], df_anno.corpusID[i], df_anno.l_object[i], df_anno.position[i], df_anno.reading[i], # df_anno.givenness[i], df_anno.token[i], # df_anno.elliptical[i], # df_anno.iterative[i], # df_anno.mod_lemma[i], do_ID_rel_psd_tokens[df_anno.corpusID[i]] ) # print(f"do_ID_rel_psd_tokens[df_anno.corpusID[i]] (len={len(do_ID_rel_psd_tokens[df_anno.corpusID[i]])}): " # f"{do_ID_rel_psd_tokens[df_anno.corpusID[i]]}") # if i % 10000 == 0: # print(f"progress: {round(i * 100 / len(df_anno), 2)} % ") # print(f"len of do_ID_rel_psd_tokens: {len(do_ID_rel_psd_tokens)}") # Get rid of all the makeshift node numbers: # print("Get rid of all the makeshift node numbers") do_ID_rel_psd_tokens = {k: re.sub("\(\d+?§", '(', v) for k, v in do_ID_rel_psd_tokens.items()} # print(f"len of do_ID_rel_psd_tokens: {len(do_ID_rel_psd_tokens)}") # print(df_anno['do_idx_node_numbrs']) # print(df_anno) # for k, v in do_ID_rel_psd_tokens.items(): # print(f"{k}:\n{v}\n\n") # print(f"len of lo_psd_token: {len(lo_psd_token)}") lo_psd_token_out = [] for i, corp_tok in enumerate(lo_psd_token): # if i % 10000 == 0: # print(f"progress: {round(i * 100 / len(lo_psd_token), 2)}% ") if re.search('\(ID (\S*?)\)', corp_tok): id = re.search('\(ID (\S*?)\)', corp_tok).group(1) # if any(re.search(id, tok) for tok in do_ID_rel_psd_tokens.keys()):# SLOW! if id in do_ID_rel_psd_tokens.keys(): for ID, anno_tok in do_ID_rel_psd_tokens.items(): if ID == id: lo_psd_token_out.append(anno_tok) # For non-relevant psd-units: else: lo_psd_token_out.append(corp_tok) # For non-ID-bearing psd-units: else: lo_psd_token_out.append(corp_tok) # print(f"len of lo_psd_token_out: {len(lo_psd_token_out)}") return lo_psd_token_out def sort_psd_units_into_files(lo_psd_token, path_to_psd_OUT): psd_text = '\n'.join([t+'\n' for t in lo_psd_token]) lo_all_psd_lines = psd_text.split('\n') # print(f"psd_text: {psd_text[:200]}") # print(f"lo_all_psd_lines: {lo_all_psd_lines[:10]}") do_fname_index = {} for i, l in enumerate(lo_all_psd_lines): if ' BOF' in l: fname = l.replace('(', '').replace(' BOF)', '') # print(f"fname: {fname} (BOF)") do_fname_index[fname] = [i+2] # +2 because we want to exclude a. the BOF-tag & b. the empty line after it. # print(f"do_fname_index[fname]: {do_fname_index[fname]} (index of line where {fname} begins)") if ' EOF' in l: fname = l.replace('(', '').replace(' EOF)', '') # print(f"fname: {fname} (EOF)") do_fname_index[fname].append(i-1) # print(f"do_fname_index[fname]: {do_fname_index[fname]} (index of line where {fname} begins AND ends)") # print(f"done with loop; do_fname_index: {do_fname_index}") # for k, v in do_fname_index.items(): # print(k, v) for idx, (fname, po_indices) in enumerate(do_fname_index.items()): # print(idx, fname, po_indices) if len(po_indices) == 2: with open(os.path.join(path_to_psd_OUT, fname), 'w') as f: f.writelines([l+'\n' for l in lo_all_psd_lines[po_indices[0]:po_indices[1]]]) elif len(po_indices) == 1: # print(" ", idx, fname, po_indices) if idx == len(do_fname_index) - 1: with open(os.path.join(path_to_psd_OUT, fname), 'w') as f: f.writelines([l+'\n' for l in lo_all_psd_lines[po_indices[0]:]]) pass path_to_tsv, path_to_psd_files, path_to_psd_OUT = get_directories() # print(f"{path_to_tsv}, {path_to_psd_files}, {path_to_psd_OUT}") print("Reading and preparing psd-file(s).\n") psd_string, lo_files = read_all_psd_files(path_to_psd_files) lo_psd_token = split_psd_string(psd_string) print("Reading and preparing annotations.\n") df_anno = read_annotations_as_df(path_to_tsv) print("Saving new psd units to file(s).\n") sort_psd_units_into_files(lo_psd_token, path_to_psd_OUT) print("Done.") os.system(f"say 'done'")