openalex_mapper / openalex_utils.py
MaxNoichl's picture
Migrate OpenAlex integration off PyAlex
b38d551
import ast
import json
import re
from datetime import datetime
import numpy as np
import pandas as pd
from openalex_client import get_openalex_client, normalize_openalex_url
def invert_abstract(inv_index):
"""Reconstruct abstract from OpenAlex' inverted-index."""
if isinstance(inv_index, str):
try:
inv_index = json.loads(inv_index)
except Exception:
try:
inv_index = ast.literal_eval(inv_index)
except Exception:
inv_index = None
if isinstance(inv_index, dict):
inv_list = [(word, pos) for word, positions in inv_index.items() for pos in positions]
return " ".join(word for word, _ in sorted(inv_list, key=lambda item: item[1]))
return " "
def get_pub(x):
"""Extract publication name from record."""
try:
source = x["source"]["display_name"]
if source not in ["parsed_publication", "Deleted Journal"]:
return source
return " "
except Exception:
return " "
def get_field(x):
"""Extract academic field from record."""
try:
field = x["primary_topic"]["subfield"]["display_name"]
if field is not None:
return field
return np.nan
except Exception:
return np.nan
def process_records_to_df(records):
"""Convert OpenAlex records to a pandas DataFrame with the expected mapper fields."""
if isinstance(records, pd.DataFrame):
records_df = records.copy()
else:
records_df = pd.DataFrame(records)
if "title" not in records_df.columns and "display_name" in records_df.columns:
records_df["title"] = records_df["display_name"]
if "title" not in records_df.columns:
records_df["title"] = " "
if "abstract" not in records_df.columns:
if "abstract_inverted_index" in records_df.columns:
records_df["abstract"] = [invert_abstract(value) for value in records_df["abstract_inverted_index"]]
else:
records_df["abstract"] = " "
if "parsed_publication" not in records_df.columns:
if "primary_location" in records_df.columns:
records_df["parsed_publication"] = [get_pub(value) for value in records_df["primary_location"]]
else:
records_df["parsed_publication"] = " "
records_df["abstract"] = records_df["abstract"].fillna(" ")
records_df["parsed_publication"] = records_df["parsed_publication"].fillna(" ")
records_df["title"] = records_df["title"].fillna(" ")
if "id" in records_df.columns:
records_df = records_df.drop_duplicates(subset=["id"]).reset_index(drop=True)
else:
records_df = records_df.reset_index(drop=True)
return records_df
def _clean_value(value):
clean_value = value.strip().strip("\"'")
clean_value = re.sub(r"[^\w\s-]", "", clean_value)
clean_value = " ".join(clean_value.split())
return clean_value
def _strip_quotes(value):
return value.strip().strip("\"'")
def openalex_url_to_filename(url):
"""Convert an OpenAlex URL to a filename-safe string with timestamp."""
query = normalize_openalex_url(url)
parts = []
if query.params.get("search"):
search_value = _clean_value(query.params["search"]).replace(" ", "_")
if search_value:
parts.append(f"search_{search_value}")
for token in query.filter_tokens:
clean_key = token.key.replace(".", "_")
clean_value = _clean_value(token.value).replace(" ", "_")
if clean_value:
parts.append(f"{clean_key}_{clean_value}")
if query.params.get("sort"):
for sort_value in query.params["sort"].split(","):
if sort_value.startswith("-"):
parts.append(f"sort_{sort_value[1:].replace('.', '_')}_desc")
else:
parts.append(f"sort_{sort_value.replace('.', '_')}_asc")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = "__".join(parts) if parts else "openalex_query"
filename = f"{filename}__{timestamp}"
if len(filename) > 255:
filename = filename[:251]
return filename
def get_records_from_dois(doi_list, block_size=50):
"""Download OpenAlex records for a list of DOIs in blocks."""
client = get_openalex_client()
return pd.DataFrame(client.fetch_records_from_dois(doi_list, block_size=block_size))
def _lookup_display_name(entity, entity_id):
client = get_openalex_client()
try:
record = client.get_entity(entity, entity_id, select_fields=("display_name",))
except Exception:
return None
return record.get("display_name")
def _lookup_cited_work(entity_id):
client = get_openalex_client()
try:
cited_work = client.get_entity("works", entity_id, select_fields=("authorships", "publication_year"))
except Exception:
return None
return cited_work
def openalex_url_to_readable_name(url):
"""Convert an OpenAlex URL to a short, human-readable query description."""
query = normalize_openalex_url(url)
parts = []
year_range = None
if query.params.get("search"):
parts.append(f"Search: '{_strip_quotes(query.params['search'])}'")
for token in query.filter_tokens:
key = token.key
value = token.value
try:
if key == "title_and_abstract.search":
parts.append(f"T&A: '{_strip_quotes(value)}'")
elif key == "publication_year":
year_range = value
elif key == "cites":
cited_work = _lookup_cited_work(value)
if cited_work:
author_name = "Unknown"
authorships = cited_work.get("authorships") or []
if authorships:
first_author = authorships[0].get("author") or {}
display_name = first_author.get("display_name")
if display_name:
author_name = display_name.split()[-1]
year = cited_work.get("publication_year") or "Unknown"
parts.append(f"Cites: {author_name} ({year})")
else:
parts.append(f"Cites: Work {value}")
elif key == "authorships.institutions.lineage" and "|" not in value:
institution_name = _lookup_display_name("institutions", value)
parts.append(f"From: {institution_name or f'Institution {value}'}")
elif key == "authorships.author.id" and "|" not in value:
author_name = _lookup_display_name("authors", value)
parts.append(f"By: {author_name or f'Author {value}'}")
elif key == "primary_location.source.id" and "|" not in value:
source_name = _lookup_display_name("sources", value)
parts.append(f"In: {source_name or f'Source {value}'}")
elif key == "topics.id" and "|" not in value:
topic_name = _lookup_display_name("topics", value)
parts.append(f"Topic: {topic_name or value}")
elif key == "concepts.id" and "|" not in value:
concept_name = _lookup_display_name("concepts", value)
parts.append(f"Concept: {concept_name or value}")
elif key == "type":
type_mapping = {
"article": "Articles",
"book": "Books",
"book-chapter": "Book Chapters",
"dissertation": "Dissertations",
"preprint": "Preprints",
}
parts.append(f"Type: {type_mapping.get(value, value.replace('-', ' ').title())}")
else:
clean_key = key.replace("_", " ").replace(".", " ").title()
clean_value = value.replace("_", " ")
parts.append(f"{clean_key}: {clean_value}")
except Exception:
continue
description = "OpenAlex Query" if not parts else ", ".join(parts)
if year_range:
description = f"{description}, {year_range}" if parts else f"Works from {year_range}"
if len(description) > 60:
description = description[:57] + "..."
return description