#!/usr/bin/env python3 """ EHDS MCP Server An MCP server exposing EHDS-compliant health data space assets to LLM agents. Provides structured access to HealthDCAT-AP catalogue metadata and FHIR-on-RDF clinical data. """ import json import logging from typing import Any from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent from SPARQLWrapper import SPARQLWrapper, JSON # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- SPARQL_ENDPOINT = "http://localhost:48242/ehds/sparql" GRAPH_CATALOGUE = "https://ehds-prototype.example.org/graph/catalogue" # At the top of server.py, after the other imports: import sys, os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) from datasets import DATASETS DATASET_GRAPHS = {v["uri"]: v["graph"] for v in DATASETS.values()} #DATASET_GRAPHS = { # "https://ehds-prototype.example.org/dataset-diabetes-cohort": GRAPH_DIABETES, # "https://ehds-prototype.example.org/dataset-hypertension-cohort": GRAPH_HYPERTENSION, # "https://ehds-prototype.example.org/dataset-metabolic-syndrome-cohort": GRAPH_METABOLIC, #} PREFIXES = """ PREFIX fhir: PREFIX dcat: PREFIX dct: PREFIX hdcat: PREFIX odrl: PREFIX xsd: PREFIX ehds: """ logging.basicConfig(level=logging.INFO) logger = logging.getLogger("ehds-mcp") # --------------------------------------------------------------------------- # SPARQL helper # --------------------------------------------------------------------------- def sparql_query(query: str) -> list[dict]: """Execute a SPARQL SELECT query and return bindings as list of dicts.""" sparql = SPARQLWrapper(SPARQL_ENDPOINT) sparql.setQuery(PREFIXES + "\n" + query) sparql.setReturnFormat(JSON) results = sparql.query().convert() bindings = results.get("results", {}).get("bindings", []) return [{k: v["value"] for k, v in row.items()} for row in bindings] def get_clinical_graph(dataset_uri: str) -> str | None: """Resolve a dataset URI to its named graph URI.""" # Direct lookup first if dataset_uri in DATASET_GRAPHS: return DATASET_GRAPHS[dataset_uri] # Suffix match for abbreviated URIs for k, v in DATASET_GRAPHS.items(): if dataset_uri.endswith(k.split("/")[-1]): return v # Fallback: query the catalogue rows = sparql_query(f""" SELECT ?graph WHERE {{ GRAPH <{GRAPH_CATALOGUE}> {{ <{dataset_uri}> dcat:distribution ?dist . ?dist hdcat:namedGraph ?graph . }} }}""") return rows[0]["graph"] if rows else None # --------------------------------------------------------------------------- # MCP Server # --------------------------------------------------------------------------- app = Server("ehds-mcp") @app.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="ehds_list_datasets", description=( "List all datasets in the EHDS catalogue with their titles, descriptions, " "population sizes, health categories, and access conditions." ), inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="ehds_describe_dataset", description=( "Get full HealthDCAT-AP metadata for a specific dataset, including temporal " "coverage, keywords, data standard, and distribution details." ), inputSchema={ "type": "object", "properties": { "dataset_uri": { "type": "string", "description": "The URI of the dataset to describe" } }, "required": ["dataset_uri"] } ), Tool( name="ehds_check_policy", description=( "Return the ODRL usage policy for a dataset: permitted purposes, " "prohibitions, and obligations." ), inputSchema={ "type": "object", "properties": { "dataset_uri": { "type": "string", "description": "The URI of the dataset" } }, "required": ["dataset_uri"] } ), Tool( name="ehds_search_datasets", description=( "Search the catalogue for datasets matching a keyword or health category. " "Returns matching dataset URIs and titles." ), inputSchema={ "type": "object", "properties": { "keyword": { "type": "string", "description": "Keyword to search for in dataset titles and descriptions" } }, "required": ["keyword"] } ), Tool( name="ehds_get_patients", description=( "List patients in a dataset with key demographic attributes: " "gender, birth date, and city." ), inputSchema={ "type": "object", "properties": { "dataset_uri": { "type": "string", "description": "The URI of the dataset" }, "limit": { "type": "integer", "description": "Maximum number of patients to return (default 10)", "default": 10 } }, "required": ["dataset_uri"] } ), Tool( name="ehds_get_condition_stats", description=( "Get aggregate statistics for a SNOMED condition in a dataset: " "patient count, gender breakdown, and age distribution." ), inputSchema={ "type": "object", "properties": { "dataset_uri": { "type": "string", "description": "The URI of the dataset" }, "snomed_code": { "type": "string", "description": "SNOMED CT code for the condition (e.g. '44054006' for Type 2 Diabetes)" } }, "required": ["dataset_uri", "snomed_code"] } ), Tool( name="ehds_query_clinical", description=( "Execute a SPARQL SELECT query against the clinical data of a specific dataset. " "Use this for custom queries not covered by other tools. " "The named graph for the dataset is automatically injected." ), inputSchema={ "type": "object", "properties": { "dataset_uri": { "type": "string", "description": "The URI of the dataset to query" }, "sparql_query": { "type": "string", "description": "SPARQL SELECT query. Use GRAPH ?g { ... } pattern. The graph URI will be provided." } }, "required": ["dataset_uri", "sparql_query"] } ), ] @app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: try: result = await dispatch_tool(name, arguments) return [TextContent(type="text", text=json.dumps(result, indent=2))] except Exception as e: logger.error(f"Tool {name} failed: {e}") return [TextContent(type="text", text=json.dumps({"error": str(e)}))] async def dispatch_tool(name: str, args: dict) -> Any: if name == "ehds_list_datasets": return tool_list_datasets() elif name == "ehds_describe_dataset": return tool_describe_dataset(args["dataset_uri"]) elif name == "ehds_check_policy": return tool_check_policy(args["dataset_uri"]) elif name == "ehds_search_datasets": return tool_search_datasets(args["keyword"]) elif name == "ehds_get_patients": return tool_get_patients(args["dataset_uri"], args.get("limit", 10)) elif name == "ehds_get_condition_stats": return tool_get_condition_stats(args["dataset_uri"], args["snomed_code"]) elif name == "ehds_query_clinical": return tool_query_clinical(args["dataset_uri"], args["sparql_query"]) else: raise ValueError(f"Unknown tool: {name}") # --------------------------------------------------------------------------- # Tool implementations # --------------------------------------------------------------------------- def tool_list_datasets() -> dict: rows = sparql_query(f""" SELECT ?dataset ?title ?description ?population ?healthCategory ?license WHERE {{ GRAPH <{GRAPH_CATALOGUE}> {{ ?dataset a dcat:Dataset ; dct:title ?title ; dct:description ?description . OPTIONAL {{ ?dataset hdcat:populationSize ?population . }} OPTIONAL {{ ?dataset hdcat:healthCategory ?healthCategory . }} OPTIONAL {{ ?dataset dct:license ?license . }} }} }}""") return {"datasets": rows, "count": len(rows)} def tool_describe_dataset(dataset_uri: str) -> dict: rows = sparql_query(f""" SELECT ?p ?o WHERE {{ GRAPH <{GRAPH_CATALOGUE}> {{ <{dataset_uri}> ?p ?o . }} }}""") if not rows: return {"error": f"Dataset not found: {dataset_uri}"} # Also get distribution details dist_rows = sparql_query(f""" SELECT ?distTitle ?accessURL ?namedGraph ?format WHERE {{ GRAPH <{GRAPH_CATALOGUE}> {{ <{dataset_uri}> dcat:distribution ?dist . OPTIONAL {{ ?dist dct:title ?distTitle . }} OPTIONAL {{ ?dist dcat:accessURL ?accessURL . }} OPTIONAL {{ ?dist hdcat:namedGraph ?namedGraph . }} OPTIONAL {{ ?dist dct:format ?format . }} }} }}""") # Get temporal coverage temporal_rows = sparql_query(f""" SELECT ?start ?end WHERE {{ GRAPH <{GRAPH_CATALOGUE}> {{ <{dataset_uri}> dct:temporal ?period . OPTIONAL {{ ?period dcat:startDate ?start . }} OPTIONAL {{ ?period dcat:endDate ?end . }} }} }}""") props = {} for row in rows: p = row["p"].split("/")[-1].split("#")[-1] props[p] = row["o"] return { "dataset_uri": dataset_uri, "properties": props, "distributions": dist_rows, "temporal_coverage": temporal_rows[0] if temporal_rows else {} } def tool_check_policy(dataset_uri: str) -> dict: rows = sparql_query(f""" SELECT ?policy ?policyTitle ?policyDescription WHERE {{ GRAPH <{GRAPH_CATALOGUE}> {{ <{dataset_uri}> odrl:hasPolicy ?policy . OPTIONAL {{ ?policy dct:title ?policyTitle . }} OPTIONAL {{ ?policy dct:description ?policyDescription . }} }} }}""") if not rows: return {"error": f"No policy found for dataset: {dataset_uri}"} policy_uri = rows[0].get("policy", "") # Get permissions perm_rows = sparql_query(f""" SELECT ?action ?purposeValue WHERE {{ GRAPH <{GRAPH_CATALOGUE}> {{ <{policy_uri}> odrl:permission ?perm . ?perm odrl:action ?action . OPTIONAL {{ ?perm odrl:constraint ?c . ?c odrl:rightOperand ?purposeValue . }} }} }}""") # Get prohibitions prohib_rows = sparql_query(f""" SELECT ?action ?purposeValue WHERE {{ GRAPH <{GRAPH_CATALOGUE}> {{ <{policy_uri}> odrl:prohibition ?prohib . ?prohib odrl:action ?action . OPTIONAL {{ ?prohib odrl:constraint ?c . ?c odrl:rightOperand ?purposeValue . }} }} }}""") # Get obligations oblig_rows = sparql_query(f""" SELECT ?action WHERE {{ GRAPH <{GRAPH_CATALOGUE}> {{ <{policy_uri}> odrl:obligation ?oblig . ?oblig odrl:action ?action . }} }}""") return { "policy_uri": policy_uri, "title": rows[0].get("policyTitle", ""), "description": rows[0].get("policyDescription", ""), "permissions": perm_rows, "prohibitions": prohib_rows, "obligations": oblig_rows } def tool_search_datasets(keyword: str) -> dict: keyword_lower = keyword.lower() rows = sparql_query(f""" SELECT ?dataset ?title ?description WHERE {{ GRAPH <{GRAPH_CATALOGUE}> {{ ?dataset a dcat:Dataset ; dct:title ?title ; dct:description ?description . FILTER ( CONTAINS(LCASE(STR(?title)), "{keyword_lower}") || CONTAINS(LCASE(STR(?description)), "{keyword_lower}") ) }} }}""") return {"keyword": keyword, "matches": rows, "count": len(rows)} def tool_get_patients(dataset_uri: str, limit: int = 10) -> dict: graph = get_clinical_graph(dataset_uri) if not graph: return {"error": f"Cannot resolve clinical graph for: {dataset_uri}"} rows = sparql_query(f""" SELECT ?patient ?gender ?birthDate ?city WHERE {{ GRAPH <{graph}> {{ ?patient a fhir:Patient . OPTIONAL {{ ?patient fhir:gender ?gender . }} OPTIONAL {{ ?patient fhir:birthDate ?birthDate . }} OPTIONAL {{ ?patient fhir:address ?addr . ?addr fhir:city ?city . }} }} }} LIMIT {limit}""") return { "dataset_uri": dataset_uri, "clinical_graph": graph, "patients": rows, "count": len(rows) } def tool_get_condition_stats(dataset_uri: str, snomed_code: str) -> dict: graph = get_clinical_graph(dataset_uri) if not graph: return {"error": f"Cannot resolve clinical graph for: {dataset_uri}"} # Patient count with this condition count_rows = sparql_query(f""" SELECT (COUNT(DISTINCT ?patient) AS ?count) WHERE {{ GRAPH <{graph}> {{ ?condition a fhir:Condition ; fhir:code ?coding ; fhir:subject ?patient . ?coding fhir:code "{snomed_code}" . }} }}""") # Gender breakdown gender_rows = sparql_query(f""" SELECT ?gender (COUNT(DISTINCT ?patient) AS ?count) WHERE {{ GRAPH <{graph}> {{ ?condition a fhir:Condition ; fhir:code ?coding ; fhir:subject ?patient . ?coding fhir:code "{snomed_code}" . ?patient fhir:gender ?gender . }} }} GROUP BY ?gender""") # Condition display name display_rows = sparql_query(f""" SELECT DISTINCT ?display WHERE {{ GRAPH <{graph}> {{ ?condition a fhir:Condition ; fhir:code ?coding . ?coding fhir:code "{snomed_code}" ; fhir:display ?display . }} }} LIMIT 1""") return { "dataset_uri": dataset_uri, "snomed_code": snomed_code, "condition_display": display_rows[0]["display"] if display_rows else snomed_code, "patient_count": count_rows[0]["count"] if count_rows else "0", "gender_breakdown": gender_rows } def tool_query_clinical(dataset_uri: str, sparql_query_str: str) -> dict: graph = get_clinical_graph(dataset_uri) if not graph: return {"error": f"Cannot resolve clinical graph for: {dataset_uri}"} # Inject graph URI as a comment so the LLM knows which graph to use annotated_query = f"# Clinical graph: {graph}\n{sparql_query_str}" try: rows = sparql_query(annotated_query) return { "dataset_uri": dataset_uri, "clinical_graph": graph, "results": rows, "count": len(rows) } except Exception as e: return {"error": str(e), "clinical_graph": graph} # --------------------------------------------------------------------------- # Entry point — stdio (local) or SSE/HTTP (remote) # --------------------------------------------------------------------------- async def main_stdio(): async with stdio_server() as (read_stream, write_stream): await app.run(read_stream, write_stream, app.create_initialization_options()) LANDING_HTML = """ EHDS Linked Health Data Portal

EHDS Linked Health Data Portal

First open EHDS-compliant linked health data resource for LLM agent evaluation

HealthDCAT-AP R5 FHIR R4 on RDF ODRL policies MCP connector CC-BY-4.0
500+
Unique patients
21.2M
RDF triples
30
Clinical cohorts
7
MCP tools
50
Benchmark queries

MCP Connector

Connect any MCP-compatible AI agent — Claude, mcphost, or Python SDK — to query the data space.

Connect via SSE

SPARQL Explorer

Write and run SPARQL queries against the HealthDCAT-AP catalogue and FHIR-on-RDF clinical data.

Open SPARQL UI

Supplements

Benchmark queries, evaluation harness, FHIR-to-RDF pipeline, and annotation guide — all files from the paper.

Browse files

RAG Vector Store

Download the pre-built ChromaDB vector store for RAG-condition evaluation.

Download ChromaDB

Knowledge Graph

Explore the EHDS data space interactively — datasets, ODRL policies, and clinical relationships visualised as a force-directed graph.

Open Explorer

Datasets

Graph Condition SNOMED CT Patients
graph/diabetesType 2 Diabetes Mellitus4405400640
graph/hypertensionEssential Hypertension5962100040
graph/metabolic-syndromeMetabolic Syndrome23760200740
graph/obesityObesity16286400540
graph/hyperlipidemiaHyperlipidemia5582200440
graph/prediabetesPrediabetes71462800240
graph/hypothyroidismHypothyroidism8366400640
graph/anemiaAnemia27173700040
graph/heart-failureHeart Failure8880500915
graph/strokeStroke23069000740
graph/myocardial-infarctionMyocardial Infarction2229800640
graph/ischemic-heart-diseaseIschaemic Heart Disease41454500840
graph/atrial-fibrillationAtrial Fibrillation4943600440
graph/dementiaDementia524480060
graph/anxietyAnxiety8058300710
graph/ptsdPTSD4750500310
graph/alzheimersAlzheimer's Disease2692900440
graph/osteoporosisOsteoporosis6485900640
graph/rheumatoid-arthritisRheumatoid Arthritis6989600438
graph/chronic-kidney-diseaseChronic Kidney Disease43185500540
graph/asthmaAsthma19596700128
graph/copdCOPD8743300140
graph/sleep-apneaSleep Apnea7343000640
graph/utiUrinary Tract Infection19792700140
graph/breast-cancerBreast Cancer25483700910
graph/prostate-cancerProstate Cancer12690600610
graph/colorectal-cancerColorectal Cancer36340600510
graph/osteoarthritisOsteoarthritis5767600240
graph/substance-use-disorderSubstance Use Disorder652500210
graph/chronic-painChronic Pain8242300110
graph/catalogue HealthDCAT-AP Release 5 metadata catalogue

Quick Start

Claude.ai: Settings → Connectors → Add custom connector

https://mcp.linkeddata.es/connector

mcphost:

echo '{"mcpServers":{"ehds":{"url":"https://mcp.linkeddata.es/connector"}}}' > ~/.mcp.json
mcphost -m ollama:llama3.2

Python:

from mcp import ClientSession
from mcp.client.sse import sse_client

async with sse_client("https://mcp.linkeddata.es/connector") as (r, w):
    async with ClientSession(r, w) as session:
        await session.initialize()
        result = await session.call_tool("ehds_list_datasets", {})

Citation

Manab et al. (2026). EHDS Linked Health Data Portal.
ISWC 2026 Resource Track. https://mcp.linkeddata.es
Ontology Engineering Group, Universidad Politécnica de Madrid  |  HARNESS Project, Horizon Europe 101169409  |  GitHub
""" SPARQL_HTML = """ SPARQL Explorer — EHDS Portal
← EHDS Portal

SPARQL Explorer

""" ## Changes to mcp/server.py ## Two additions: a SUPPLEMENTS_HTML page and a /supplements route. ## Everything else in server.py is unchanged. # ───────────────────────────────────────────────────────────── # 1. Add this constant near SPARQL_HTML / LANDING_HTML # ───────────────────────────────────────────────────────────── SUPPLEMENTS_HTML = """ Supplements — EHDS Portal
← EHDS Portal

Supplements

Supplementary materials for the ISWC 2025 Resource Track paper. All files are part of the ehds-linked-data-portal repository and released under Creative Commons Attribution 4.0 International licence.

📄
Dataset Description
30 clinical cohorts, ODRL policies, HealthDCAT-AP catalogue schema, named graph structure, and infrastructure overview
dataset_description.pdf
📄
Annotation Guide
Atomic fact extraction protocol, completeness and hallucination scoring rules, and five worked examples with highlighted responses
annotation_guide.pdf
🐍
evaluation_50_queries.py
Runs all 50 benchmark queries under baseline, RAG, and MCP conditions; writes per-query JSON results
eval/evaluation_50_queries.py
🐍
benchmark.py
50 benchmark queries with embedded SPARQL ground-truth derivations; authoritative source for all ground truth values
eval/benchmark.py
📊
benchmark.csv
Flat CSV version of the benchmark ground truth, one row per query
eval/benchmark.csv
🐍
fhir_to_rdf.py
Converts Synthea FHIR R4 JSON bundles to RDF Turtle, preserving Patient, Condition, Observation, MedicationRequest, and Encounter resources
fhir_to_rdf.py
🐍
server.py
MCP server exposing seven typed tools over SSE; compatible with Claude, mcphost, DeepSeek, and the Python MCP SDK
mcp/server.py
""" # ───────────────────────────────────────────────────────────── # 2. Inside main_sse(), add two things: # # a) A handler that serves files from the repo root # b) Two new routes — one for the browser page, one for file downloads # # Add these immediately after the existing route definitions, # before the Starlette() constructor call. # ───────────────────────────────────────────────────────────── # --- paste this block inside main_sse(), after the existing handlers --- import pathlib # Absolute path to the repository root (one level up from mcp/) REPO_ROOT = pathlib.Path("/home/meem/ehds-linked-data-portal") # Map URL filename → filesystem path relative to repo root. # Add any future supplement files here. SUPPLEMENT_FILES = { "dataset_description.pdf": REPO_ROOT / "dataset_description.pdf", "annotation_guide.pdf": REPO_ROOT / "annotation_guide.pdf", "evaluation_50_queries.py": REPO_ROOT / "eval" / "evaluation_50_queries.py", "benchmark.py": REPO_ROOT / "eval" / "benchmark.py", "benchmark.csv": REPO_ROOT / "eval" / "benchmark.csv", "fhir_to_rdf.py": REPO_ROOT / "fhir_to_rdf.py", "server.py": REPO_ROOT / "mcp" / "server.py", } MEDIA_TYPES = { ".pdf": "application/pdf", ".py": "text/plain; charset=utf-8", ".csv": "text/csv; charset=utf-8", } # --- then add these two entries to the routes list inside Starlette() --- # Route("/supplements", endpoint=supplements_browser), # Route("/supplements/{filename}", endpoint=supplements_file), # ───────────────────────────────────────────────────────────── # 3. In LANDING_HTML, add this card after the visualization card # (copy into the .cards div, after the Knowledge Graph card) # ───────────────────────────────────────────────────────────── def main_sse(host: str = "0.0.0.0", port: int = 48243): import shutil import httpx import uvicorn from mcp.server.sse import SseServerTransport from starlette.applications import Starlette from starlette.routing import Route, Mount from starlette.responses import HTMLResponse, FileResponse, Response from visualization import VIZ_HTML sse = SseServerTransport("/messages/") async def supplements_browser(request): return HTMLResponse(SUPPLEMENTS_HTML) async def supplements_file(request): filename = request.path_params["filename"] filepath = SUPPLEMENT_FILES.get(filename) if filepath is None or not filepath.exists(): return Response("Not found", status_code=404) suffix = filepath.suffix.lower() media_type = MEDIA_TYPES.get(suffix, "application/octet-stream") return FileResponse(str(filepath), media_type=media_type, filename=filename) async def handle_sse(request): async with sse.connect_sse( request.scope, request.receive, request._send ) as streams: await app.run(streams[0], streams[1], app.create_initialization_options()) async def landing(request): return HTMLResponse(LANDING_HTML) async def sparql_ui(request): return HTMLResponse(SPARQL_HTML) async def sparql_proxy(request): params = dict(request.query_params) accept = request.headers.get("Accept", "application/sparql-results+json") async with httpx.AsyncClient(timeout=60.0) as client: resp = await client.get( "http://localhost:48242/ehds/sparql", params=params, headers={"Accept": accept}, ) return Response( content=resp.content, status_code=resp.status_code, media_type=resp.headers.get("content-type", "application/json"), ) async def rag_download(request): zip_path = "/home/meem/ehds-mcp/rag/chroma_db.zip" db_path = "/home/meem/ehds-mcp/rag/chroma_db" if not __import__("pathlib").Path(zip_path).exists(): logger.info("Building chroma_db.zip for download...") shutil.make_archive(zip_path.replace(".zip", ""), "zip", db_path) return FileResponse( zip_path, media_type="application/zip", filename="ehds_chroma_db.zip", ) starlette_app = Starlette( routes=[ Route("/", endpoint=landing), Route("/sparql", endpoint=sparql_proxy), Route("/sparql/", endpoint=sparql_ui), Route("/connector", endpoint=handle_sse), Route("/supplements", endpoint=supplements_browser), # ← add Route("/supplements/{filename}", endpoint=supplements_file), # ← add Route("/rag", endpoint=rag_download), Mount("/messages/", app=sse.handle_post_message), Route("/visualization", endpoint=lambda r: HTMLResponse(VIZ_HTML)), ] ) logger.info(f"Starting EHDS MCP server (SSE) on {host}:{port}") uvicorn.run(starlette_app, host=host, port=port) if __name__ == "__main__": import sys import asyncio if "--sse" in sys.argv: port = 48243 for arg in sys.argv: if arg.startswith("--port="): port = int(arg.split("=")[1]) main_sse(port=port) else: asyncio.run(main_stdio())