import ast import json import re from datetime import datetime import numpy as np import pandas as pd from openalex_client import get_openalex_client, normalize_openalex_url def invert_abstract(inv_index): """Reconstruct abstract from OpenAlex' inverted-index.""" if isinstance(inv_index, str): try: inv_index = json.loads(inv_index) except Exception: try: inv_index = ast.literal_eval(inv_index) except Exception: inv_index = None if isinstance(inv_index, dict): inv_list = [(word, pos) for word, positions in inv_index.items() for pos in positions] return " ".join(word for word, _ in sorted(inv_list, key=lambda item: item[1])) return " " def get_pub(x): """Extract publication name from record.""" try: source = x["source"]["display_name"] if source not in ["parsed_publication", "Deleted Journal"]: return source return " " except Exception: return " " def get_field(x): """Extract academic field from record.""" try: field = x["primary_topic"]["subfield"]["display_name"] if field is not None: return field return np.nan except Exception: return np.nan def process_records_to_df(records): """Convert OpenAlex records to a pandas DataFrame with the expected mapper fields.""" if isinstance(records, pd.DataFrame): records_df = records.copy() else: records_df = pd.DataFrame(records) if "title" not in records_df.columns and "display_name" in records_df.columns: records_df["title"] = records_df["display_name"] if "title" not in records_df.columns: records_df["title"] = " " if "abstract" not in records_df.columns: if "abstract_inverted_index" in records_df.columns: records_df["abstract"] = [invert_abstract(value) for value in records_df["abstract_inverted_index"]] else: records_df["abstract"] = " " if "parsed_publication" not in records_df.columns: if "primary_location" in records_df.columns: records_df["parsed_publication"] = [get_pub(value) for value in records_df["primary_location"]] else: records_df["parsed_publication"] = " " records_df["abstract"] = records_df["abstract"].fillna(" ") records_df["parsed_publication"] = records_df["parsed_publication"].fillna(" ") records_df["title"] = records_df["title"].fillna(" ") if "id" in records_df.columns: records_df = records_df.drop_duplicates(subset=["id"]).reset_index(drop=True) else: records_df = records_df.reset_index(drop=True) return records_df def _clean_value(value): clean_value = value.strip().strip("\"'") clean_value = re.sub(r"[^\w\s-]", "", clean_value) clean_value = " ".join(clean_value.split()) return clean_value def _strip_quotes(value): return value.strip().strip("\"'") def openalex_url_to_filename(url): """Convert an OpenAlex URL to a filename-safe string with timestamp.""" query = normalize_openalex_url(url) parts = [] if query.params.get("search"): search_value = _clean_value(query.params["search"]).replace(" ", "_") if search_value: parts.append(f"search_{search_value}") for token in query.filter_tokens: clean_key = token.key.replace(".", "_") clean_value = _clean_value(token.value).replace(" ", "_") if clean_value: parts.append(f"{clean_key}_{clean_value}") if query.params.get("sort"): for sort_value in query.params["sort"].split(","): if sort_value.startswith("-"): parts.append(f"sort_{sort_value[1:].replace('.', '_')}_desc") else: parts.append(f"sort_{sort_value.replace('.', '_')}_asc") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = "__".join(parts) if parts else "openalex_query" filename = f"{filename}__{timestamp}" if len(filename) > 255: filename = filename[:251] return filename def get_records_from_dois(doi_list, block_size=50): """Download OpenAlex records for a list of DOIs in blocks.""" client = get_openalex_client() return pd.DataFrame(client.fetch_records_from_dois(doi_list, block_size=block_size)) def _lookup_display_name(entity, entity_id): client = get_openalex_client() try: record = client.get_entity(entity, entity_id, select_fields=("display_name",)) except Exception: return None return record.get("display_name") def _lookup_cited_work(entity_id): client = get_openalex_client() try: cited_work = client.get_entity("works", entity_id, select_fields=("authorships", "publication_year")) except Exception: return None return cited_work def openalex_url_to_readable_name(url): """Convert an OpenAlex URL to a short, human-readable query description.""" query = normalize_openalex_url(url) parts = [] year_range = None if query.params.get("search"): parts.append(f"Search: '{_strip_quotes(query.params['search'])}'") for token in query.filter_tokens: key = token.key value = token.value try: if key == "title_and_abstract.search": parts.append(f"T&A: '{_strip_quotes(value)}'") elif key == "publication_year": year_range = value elif key == "cites": cited_work = _lookup_cited_work(value) if cited_work: author_name = "Unknown" authorships = cited_work.get("authorships") or [] if authorships: first_author = authorships[0].get("author") or {} display_name = first_author.get("display_name") if display_name: author_name = display_name.split()[-1] year = cited_work.get("publication_year") or "Unknown" parts.append(f"Cites: {author_name} ({year})") else: parts.append(f"Cites: Work {value}") elif key == "authorships.institutions.lineage" and "|" not in value: institution_name = _lookup_display_name("institutions", value) parts.append(f"From: {institution_name or f'Institution {value}'}") elif key == "authorships.author.id" and "|" not in value: author_name = _lookup_display_name("authors", value) parts.append(f"By: {author_name or f'Author {value}'}") elif key == "primary_location.source.id" and "|" not in value: source_name = _lookup_display_name("sources", value) parts.append(f"In: {source_name or f'Source {value}'}") elif key == "topics.id" and "|" not in value: topic_name = _lookup_display_name("topics", value) parts.append(f"Topic: {topic_name or value}") elif key == "concepts.id" and "|" not in value: concept_name = _lookup_display_name("concepts", value) parts.append(f"Concept: {concept_name or value}") elif key == "type": type_mapping = { "article": "Articles", "book": "Books", "book-chapter": "Book Chapters", "dissertation": "Dissertations", "preprint": "Preprints", } parts.append(f"Type: {type_mapping.get(value, value.replace('-', ' ').title())}") else: clean_key = key.replace("_", " ").replace(".", " ").title() clean_value = value.replace("_", " ") parts.append(f"{clean_key}: {clean_value}") except Exception: continue description = "OpenAlex Query" if not parts else ", ".join(parts) if year_range: description = f"{description}, {year_range}" if parts else f"Works from {year_range}" if len(description) > 60: description = description[:57] + "..." return description