| import json |
| import numpy as np |
| import re |
| from itertools import combinations as itertools_combinations |
| import os |
| import sys |
| from SPARQLWrapper import SPARQLWrapper, JSON |
| from sentence_transformers import SentenceTransformer |
| import aiohttp |
| import asyncio |
| import streamlit as st |
| import time |
| from openai import OpenAI |
| import sys |
| import time |
| from bs4 import BeautifulSoup |
| import requests |
| import nest_asyncio |
| import httpx |
|
|
|
|
| nest_asyncio.apply() |
|
|
|
|
| folder_path = '/home/user/app/qids_folder' |
|
|
| if not os.path.exists(folder_path): |
| os.mkdir(folder_path) |
| else: |
| pass |
|
|
|
|
| folder_path_1 = '/home/user/app/info_extraction' |
|
|
| if not os.path.exists(folder_path_1): |
| os.mkdir(folder_path_1) |
| print(f"Folder created at {folder_path_1}") |
| else: |
| pass |
|
|
| model = SentenceTransformer("Lajavaness/bilingual-embedding-large", trust_remote_code=True) |
|
|
| async def fetch_json(url, session): |
| async with session.get(url) as response: |
| return await response.json() |
|
|
| async def combination_method(name, session): |
| async with aiohttp.ClientSession() as session: |
| data = set() |
| new_name = name.split() |
| x = itertools_combinations(new_name, 2) |
| for i in x: |
| new_word = (i[0] + " " + i[1]) |
| url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={new_word}&srlimit=20&srprop=&srenablerewrites=True&format=json" |
| json_data = await fetch_json(url, session) |
| suggestion = json_data.get('query', {}).get('search', {}) |
| for pageid in suggestion: |
| data.add(pageid.get('title', {})) |
| return data |
|
|
| async def single_method(name, session): |
| async with aiohttp.ClientSession() as session: |
| data = set() |
| new_name = name.replace("-", " ").replace("/", " ").split() |
| for i in new_name: |
| url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={i}&srlimit=20&srprop=&srenablerewrites=True&format=json" |
| json_data = await fetch_json(url, session) |
| suggestion = json_data.get('query', {}).get('search', {}) |
| for pageid in suggestion: |
| data.add(pageid.get('title', {})) |
| return data |
|
|
| async def mains(name, deep_search): |
| data = set() |
| disam_data = set() |
| qids = set() |
| |
| async with aiohttp.ClientSession() as session: |
| url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={name}&srlimit=20&srprop=&srenablerewrites=True&format=json" |
| json_data = await fetch_json(url, session) |
| suggestion = json_data.get('query', {}).get('search', {}) |
| for pageid in suggestion: |
| data.add(pageid.get('title', {})) |
|
|
| wikipedia_url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={name}&srlimit=1&srprop=&srenablerewrites=True&srinfo=suggestion&format=json" |
| json_data = await fetch_json(wikipedia_url, session) |
| suggestion = json_data.get('query', {}).get('searchinfo', {}).get('suggestion') |
|
|
| if suggestion: |
| suggested_url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={suggestion}&srlimit=10&srprop=&srenablerewrites=True&srinfo=suggestion&format=json" |
| json_suggestion = await fetch_json(suggested_url, session) |
| results = json_suggestion.get('query', {}).get('search') |
| for i in results: |
| data.add(i.get('title')) |
|
|
| |
| if data != {0}: |
| for ids in data: |
| titles = set() |
| wikipedia_disambiguation = f"https://en.wikipedia.org/w/api.php?action=query&generator=links&format=json&redirects=1&titles={ids}&prop=pageprops&gpllimit=500&ppprop=wikibase_item" |
| json_id = await fetch_json(wikipedia_disambiguation, session) |
| try: |
| title = json_id.get('query').get('pages') |
| for k, v in title.items(): |
| titles.add(v.get("title")) |
| except: |
| pass |
|
|
| if "Help:Disambiguation" in titles: |
| for i in titles: |
| if ":" not in i and "disambiguation" not in i: |
| disam_data.add(i) |
| else: |
| disam_data.add(ids) |
|
|
| |
| if deep_search == "Yes": |
| if len(name.replace("-", " ").split()) >= 3: |
| combination_names = await combination_method(name, session) |
| for i in combination_names: |
| disam_data.add(i) |
|
|
| |
| if deep_search == "Yes": |
| if len(name.replace("-", " ").replace("/", " ").split()) >= 2: |
| singles = await single_method(name, session) |
| for i in singles: |
| disam_data.add(i) |
|
|
| for ids in disam_data: |
| try: |
| wikibase_url = f"https://en.wikipedia.org/w/api.php?action=query&titles={ids}&prop=pageprops&format=json" |
| json_qid = await fetch_json(wikibase_url, session) |
| wikidata_qid = json_qid.get('query', {}).get('pages', {}) |
| for page_id, page_data in wikidata_qid.items(): |
| page_props = page_data.get('pageprops', {}) |
| wikibase_item = page_props.get('wikibase_item', None) |
| if wikibase_item: |
| qids.add(wikibase_item) |
| except: |
| pass |
|
|
| with open(f"/home/user/app/qids_folder/{name}.json", "w") as f: |
| json.dump(list(qids), f) |
|
|
|
|
| async def get_results(query): |
| user_agent = "WDQS-example Python/%s.%s" % (sys.version_info[0], sys.version_info[1]) |
| url = "https://query.wikidata.org/sparql" |
| sparql = SPARQLWrapper(url, agent=user_agent) |
| sparql.setQuery(query) |
| sparql.setReturnFormat(JSON) |
| return sparql.query().convert() |
| |
| def get_resultss(query): |
| user_agent = "WDQS-example Python/%s.%s" % (sys.version_info[0], sys.version_info[1]) |
| url = "https://query.wikidata.org/sparql" |
| sparql = SPARQLWrapper(url, agent=user_agent) |
| sparql.setQuery(query) |
| sparql.setReturnFormat(JSON) |
| return sparql.query().convert() |
|
|
| |
| def cleaner(text): |
| text = text.replace('\\', '').replace('\n', ' ') |
| text = re.sub(r'\{.*?\}', '', text) |
| text = re.sub(' +', ' ', text).strip() |
| return text |
|
|
| async def retriever(qid): |
| async with aiohttp.ClientSession() as session: |
| list_with_sent = [] |
|
|
| query_label = f"""SELECT ?subjectLabel |
| WHERE {{ |
| wd:{qid} rdfs:label ?subjectLabel . |
| FILTER(LANG(?subjectLabel) = "en") |
| }} |
| """ |
|
|
| results = await get_results(query_label) |
|
|
| label = None |
| if results["results"]["bindings"]: |
| for result in results["results"]["bindings"]: |
| for key, value in result.items(): |
| label = value.get("value", {}).lower() |
|
|
| query_alias = f"""SELECT ?alias |
| WHERE {{ |
| wd:{qid} skos:altLabel ?alias |
| FILTER(LANG(?alias) = "en") |
| }} |
| """ |
|
|
| alias_list = [] |
| results = await get_results(query_alias) |
|
|
| for result in results["results"]["bindings"]: |
| for key, value in result.items(): |
| alias = value.get("value", "None") |
| alias_list.append(alias) |
|
|
| query_desci = f"""SELECT ?subjectLabel |
| WHERE {{ |
| ?subjectLabel schema:about wd:{qid} ; |
| schema:inLanguage "en" ; |
| schema:isPartOf <https://en.wikipedia.org/> . |
| }} |
| """ |
|
|
| results = await get_results(query_desci) |
| cleaned_first_para = "None" |
| |
| if results["results"]["bindings"]: |
| for result in results["results"]["bindings"]: |
| for key, value in result.items(): |
| desc = value.get("value", "None") |
|
|
| title = desc.split("/wiki/")[1] |
|
|
| url = f"https://en.wikipedia.org/w/api.php?action=query&prop=extracts&titles={title}&exintro=&exsentences=2&explaintext=&redirects=&formatversion=2&format=json" |
| |
| |
| json_data = await fetch_json(url, session) |
| cleaned_first_para = cleaner(json_data.get('query', {}).get('pages', [{}])[0].get('extract', 'None')) |
| else: |
| query_desc = f"""SELECT ?subjectLabel |
| WHERE {{ |
| wd:{qid} schema:description ?subjectLabel . |
| FILTER(LANG(?subjectLabel) = "en") |
| }} |
| """ |
|
|
| results = await get_results(query_desc) |
| if results["results"]["bindings"]: |
| for result in results["results"]["bindings"]: |
| for key, value in result.items(): |
| cleaned_first_para = value.get("value", "None") |
|
|
| list_with_sent.append({"qid": qid, "label": label, "description": cleaned_first_para}) |
|
|
| if alias_list: |
| for alias in alias_list: |
| list_with_sent.append({"qid": qid, "label": alias.lower(), "description": cleaned_first_para}) |
|
|
| return list_with_sent |
|
|
| async def main(name): |
| with open(f"/home/user/app/qids_folder/{name}.json", "r") as f: |
| final_list = [] |
| qids = json.load(f) |
| for q in qids: |
| returned_list = await retriever(q) |
| if returned_list: |
| final_list.extend(returned_list) |
|
|
| with open(f"/home/user/app/info_extraction/{name}.json", "w", encoding="utf-8") as flast: |
| json.dump(final_list, flast) |
|
|
| def main_cli(): |
| st.title("✨ Entity Linking Application ✨") |
| st.caption("This web application is part of my master’s dissertation.") |
|
|
| if 'run_button' in st.session_state and st.session_state.run_button == True: |
| st.session_state.running = True |
| else: |
| st.session_state.running = False |
| |
| api_token = st.text_input("Enter your API key from [GitHub](https://github.com/marketplace/models/azure-openai/gpt-4o):", "", type="password", disabled=st.session_state.running) |
| |
| if api_token: |
| endpoint = "https://models.inference.ai.azure.com" |
| model_name = "gpt-4o" |
| client = OpenAI( |
| base_url=endpoint, |
| api_key=api_token, |
| ) |
| st.success("API Token is set for this session.") |
| else: |
| st.warning("Please enter an API token to proceed.") |
| |
| input_sentence_user = st.text_input("Enter a sentence:", "", disabled=st.session_state.running) |
| input_mention_user = st.text_input("Enter a textural reference (mention) that is inside the sentence:", "", disabled=st.session_state.running) |
| deep_search = st.selectbox("Perform deep search? (Useful for difficult mentions)", ['Yes', 'No'], index=1, disabled=st.session_state.running) |
| |
| if st.button("Run Entity Linking", key="run_button", disabled=st.session_state.running): |
| if input_sentence_user and input_mention_user: |
| |
| if input_mention_user in input_sentence_user: |
| with st.spinner("Applying Data Normalization module... (1/5)"): |
| |
| start_time = time.time() |
| |
| list_with_full_names = [] |
| list_with_names_to_show = [] |
| |
| response = client.chat.completions.create( |
| messages=[ |
| { |
| "role": "system", |
| "content": """ |
| I will give you one or more labels within a sentence. Your task is as follows: |
| |
| Identify each label in the sentence, and check if it is an acronym. |
| |
| If the label is an acronym, respond with the full name of the acronym. |
| If the label is not an acronym, respond with the label exactly as it was given to you. |
| If a label contains multiple terms (e.g., 'phase and DIC microscopy'), treat each term within the label as a separate label. |
| |
| This means you should identify and explain each part of the label individually. |
| Each part should be on its own line in the response. |
| Context-Specific Terms: If the sentence context suggests a relevant term that applies to each label (such as "study" in 'morphological, sedimentological, and stratigraphical study'), add that term to each label’s explanation. |
| |
| Use context clues to determine the appropriate term to add (e.g., 'study' or 'microscopy'). |
| Output Format: Your response should contain only the explanations, formatted as follows: |
| |
| Each label or part of a label should be on a new line. |
| Do not include any additional text, and do not repeat the original sentence. |
| Example 1: |
| |
| Input: |
| |
| label: phase and DIC microscopy |
| context: Tardigrades have been extracted from samples using centrifugation with Ludox AM™ and mounted on individual microscope slides in Hoyer's medium for identification under phase and DIC microscopy. |
| Expected response: |
| |
| phase: phase microscopy |
| DIC microscopy: Differential interference contrast microscopy |
| Example 2: |
| |
| Input: |
| |
| label: morphological, sedimentological, and stratigraphical study |
| context: This paper presents results of a morphological, sedimentological, and stratigraphical study of relict beach ridges formed on a prograded coastal barrier in Bream Bay, North Island New Zealand. |
| Expected response: |
| |
| morphological: morphological study |
| sedimentological: sedimentological study |
| stratigraphical: stratigraphical study |
| IMPORTANT: |
| |
| Each label, even if nested within another, should be treated as an individual item. |
| Each individual label or acronym should be output on a separate line. |
| """ |
| }, |
| { |
| "role": "user", |
| "content": f"label:{input_mention_user}, context:{input_sentence_user}" |
| } |
| ], |
| temperature=1.0, |
| top_p=1.0, |
| max_tokens=1000, |
| model=model_name |
| ) |
| |
| |
| kati = response.choices[0].message.content.splitlines() |
| print(response.choices[0].message.content) |
| for i in kati: |
| context = i.split(":")[-1].strip() |
| original_name = i.split(":")[0].strip() |
| list_with_full_names.append(context) |
| list_with_names_to_show.append(original_name) |
| |
| name = ",".join(list_with_full_names) |
| |
| |
| input_sentence_user = input_sentence_user.replace(input_mention_user, name) |
| |
| response = client.chat.completions.create( |
| messages=[ |
| { |
| "role": "system", |
| "content": "Given a label or labels within a sentence, provide a brief description (2-3 sentences) explaining what the label represents, similar to how a Wikipedia entry would. Format your response as follows: label: description. I want only the description of the label, not the role in the context. Include the label in the description as well. For example: Sentiment analysis: Sentiment analysis is the use of natural language processing, text analysis, computational linguistics, and biometrics to systematically identify, extract, quantify, and study affective states and subjective information.\nText analysis: Text mining, text data mining (TDM) or text analytics is the process of deriving high-quality information from text. It involves the discovery by computer of new, previously unknown information, by automatically extracting information from different written resources.", |
| }, |
| { |
| "role": "user", |
| "content": f"label:{name}, context:{input_sentence_user}" |
| } |
| ], |
| temperature=1.0, |
| top_p=1.0, |
| max_tokens=1000, |
| model=model_name |
| ) |
| |
| |
| z = response.choices[0].message.content.splitlines() |
| print(response.choices[0].message.content) |
| list_with_contexts = [] |
| for i in z: |
| context = i.split(":")[-1].strip() |
| list_with_contexts.append(context) |
| st.write("✅ Applied Data Normilzation module (1/5)") |
| |
| async def big_main(mention, deep_search): |
| mention = mention.split(",") |
| with st.spinner("Applying Candidate Retrieval module... (2/5)"): |
| for i in mention: |
| await mains(i, deep_search) |
| st.write("✅ Applied Candidate Retrieval module (2/5)") |
| with st.spinner("Applying Information Gathering module... (3/5)"): |
| for i in mention: |
| await main(i) |
| st.write("✅ Applied Information Gathering module (3/5)") |
| |
| asyncio.run(big_main(name, deep_search)) |
|
|
| number = 0 |
| for i,j,o in zip(list_with_full_names,list_with_contexts,list_with_names_to_show): |
| number += 1 |
| with st.spinner(f"Applying Candidate Selection module... (4/5) [{number}/{len(list_with_full_names)}] (This may take a while due to limited resources)"): |
| with open(f"/home/user/app/info_extraction/{i}.json", "r") as f: |
| json_file = json.load(f) |
| lista = [] |
| lista_1 = [] |
| my_bar = st.progress(0) |
| for index, element in enumerate(json_file): |
| qid = element.get("qid") |
| link = f"https://www.wikidata.org/wiki/{qid}" |
| label = element.get("label") |
| description = element.get("description") |
| |
| label_emb = model.encode([label]) |
| desc_emb = model.encode([description]) |
| |
| lista.append({link: [label_emb, desc_emb]}) |
| my_bar.progress((index + 1) / len(json_file)) |
| print(qid) |
| |
| label_dataset_emb = model.encode([i]) |
| desc_dataset_emb = model.encode([j]) |
| |
| for emb in lista: |
| for k, v in emb.items(): |
| cossim_label = model.similarity(label_dataset_emb, v[0][0]) |
| desc_label = model.similarity(desc_dataset_emb, v[1][0]) |
| emb_mean = np.mean([cossim_label, desc_label]) |
| lista_1.append({k: emb_mean}) |
| |
| sorted_data = sorted(lista_1, key=lambda x: list(x.values())[0], reverse=True) |
| |
| my_bar.empty() |
| st.write(f"✅ Applined Candidate Selection module (4/5) [{number}/{len(list_with_full_names)}]") |
| with st.spinner(f"Applying Candidate Matching module... (5/5) [{number}/{len(list_with_full_names)}]"): |
| if sorted_data: |
| sorted_top = sorted_data[0] |
| for k, v in sorted_top.items(): |
| qid = k.split("/")[-1] |
| |
| wikidata2wikipedia = f""" |
| SELECT ?wikipedia |
| WHERE {{ |
| ?wikipedia schema:about wd:{qid} . |
| ?wikipedia schema:isPartOf <https://en.wikipedia.org/> . |
| }} |
| """ |
| results = get_resultss(wikidata2wikipedia) |
|
|
| for result in results["results"]["bindings"]: |
| for key, value in result.items(): |
| wikipedia = value.get("value", "None") |
| |
| sparql = SPARQLWrapper("http://dbpedia.org/sparql") |
| wikidata2dbpedia = f""" |
| SELECT ?dbpedia |
| WHERE {{ |
| ?dbpedia owl:sameAs <http://www.wikidata.org/entity/{qid}>. |
| }} |
| """ |
| sparql.setQuery(wikidata2dbpedia) |
| sparql.setReturnFormat(JSON) |
| results = sparql.query().convert() |
| |
| for result in results["results"]["bindings"]: |
| dbpedia = result["dbpedia"]["value"] |
| |
| st.write(f"✅ Applied Candidate Matching module (5/5) [{number}/{len(list_with_full_names)}]") |
| st.text(f"The correct entity for '{o}' is:") |
| st.success(f"Wikipedia: {wikipedia}") |
| st.success(f"Wikidata: {k}") |
| st.success(f"DBpedia: {dbpedia}") |
| else: |
| st.warning(f"The entity: {o} is NIL.") |
| else: |
| st.warning(f"The mention '{input_mention_user}' was NOT found in the sentence.") |
| else: |
| st.warning("Please fill in both fields.") |
| end_time = time.time() |
| execution_time = end_time - start_time |
| ETA = time.strftime("%H:%M:%S", time.gmtime(execution_time)) |
| st.write(f"⌛ Execution time: {ETA}") |
| |
| st.button("Rerun", disabled=False) |
|
|
| |
| folder_path = "qids_folder" |
| for filename in os.listdir(folder_path): |
| file_path = os.path.join(folder_path, filename) |
| os.remove(file_path) |
| |
| folder_path_1 = "info_extraction" |
| for filename in os.listdir(folder_path_1): |
| file_path = os.path.join(folder_path_1, filename) |
| os.remove(file_path) |
| |
| if __name__ == "__main__": |
| main_cli() |