"""
Created on 2020-09-23
@author: wf
"""
import os
import re
import yaml
from lodstorage.query import QueryManager
from lodstorage.sparql import SPARQL
from geograpy.utils import Profiler
[docs]
class Wikidata(object):
"""
Wikidata access with proper User-Agent and rate limiting
"""
# Rate limiting constants for Wikidata
# see https://stackoverflow.com/questions/62396801/how-to-handle-too-many-requests-on-wikidata-using-sparqlwrapper
CALLS_PER_MINUTE = 30
def __init__(
self,
endpoint: str = None,
endpoint_name: str = "wikidata-qlever",
profile: bool = True,
calls_per_minute: int = None
):
"""
Constructor
Args:
endpoint(str): the SPARQL endpoint URL (overrides endpoint_name if provided)
endpoint_name(str): name of endpoint from endpoints.yaml (default: wikidata-qlever)
profile(bool): if True show profiling information
calls_per_minute(int): rate limit for API calls (uses endpoint config if not specified)
"""
module_dir = os.path.dirname(__file__)
# Load endpoints configuration
endpoints_path = os.path.join(module_dir, "data", "endpoints.yaml")
with open(endpoints_path, 'r') as f:
endpoints_config = yaml.safe_load(f)
# If endpoint URL is provided directly, use it
if endpoint:
self.endpoint = endpoint
self.calls_per_minute = calls_per_minute or self.CALLS_PER_MINUTE
else:
# Use endpoint_name to lookup configuration
if endpoint_name not in endpoints_config['endpoints']:
raise ValueError(f"Endpoint '{endpoint_name}' not found in endpoints.yaml")
endpoint_cfg = endpoints_config['endpoints'][endpoint_name]
self.endpoint = endpoint_cfg['endpoint']
self.calls_per_minute = calls_per_minute or endpoint_cfg.get('calls_per_minute', self.CALLS_PER_MINUTE)
self.profile = profile
# Load queries from queries.yaml
queries_path = os.path.join(module_dir, "data", "queries.yaml")
self.qm = QueryManager(lang="sparql", queriesPath=queries_path, with_default=False)
[docs]
def testAvailability(self) -> bool:
"""
test if the endpoint is available with a fast query
Returns:
bool: True if endpoint is available, False otherwise
"""
try:
query = self.qm.queriesByName.get("EndpointAvailability")
if not query:
# Fallback simple query if EndpointAvailability not in queries.yaml
queryString = "SELECT ?item WHERE { BIND(<http://www.wikidata.org/entity/Q2> as ?item) } LIMIT 1"
else:
queryString = query.query
wd = SPARQL(self.endpoint, calls_per_minute=self.calls_per_minute)
wd.sparql.setTimeout(5.0)
results = wd.query(queryString)
return results is not None
except Exception:
return False
[docs]
def query(self, msg, queryString: str, limit=None) -> list:
"""
get the query result
Args:
msg(str): the profile message to display
queryString(str): the query to execute
Return:
list: the list of dicts with the result
"""
profile = Profiler(msg, profile=self.profile)
# Create SPARQL instance with rate limiting and proper User-Agent
wd = SPARQL(self.endpoint, calls_per_minute=self.calls_per_minute)
limitedQuery = queryString
if limit is not None:
limitedQuery = f"{queryString} LIMIT {limit}"
results = wd.query(limitedQuery)
lod = wd.asListOfDicts(results)
for record in lod:
for key in list(record.keys()):
value = record[key]
if isinstance(value, str):
if value.startswith("http://www.wikidata.org/"):
record[key] = self.getWikidataId(value)
if key.lower().endswith("coord"):
lat, lon = Wikidata.getCoordinateComponents(value)
record["lat"] = lat
record["lon"] = lon
record.pop(key)
profile.time(f"({len(lod)})")
return lod
[docs]
def store2DB(self, lod, tableName: str, primaryKey: str = None, sqlDB=None):
"""
store the given list of dicts to the database
Args:
lod(list): the list of dicts
tableName(str): the table name to use
primaryKey(str): primary key (if any)
sqlDB(SQLDB): target SQL database
"""
msg = f"Storing {tableName}"
profile = Profiler(msg, profile=self.profile)
entityInfo = sqlDB.createTable(
lod,
entityName=tableName,
primaryKey=primaryKey,
withDrop=True,
sampleRecordCount=-1,
)
sqlDB.store(lod, entityInfo, fixNone=True)
profile.time()
[docs]
def getCountries(self, limit=None):
"""
get a list of countries
"""
query = self.qm.queriesByName.get("Countries")
if not query:
raise ValueError("Countries query not found in queries.yaml")
msg = "Getting countries from wikidata ETA 10s"
countryList = self.query(msg, query.query, limit=limit)
return countryList
[docs]
def getRegions(self, limit=None):
"""
get Regions from Wikidata
"""
query = self.qm.queriesByName.get("Regions")
if not query:
raise ValueError("Regions query not found in queries.yaml")
msg = "Getting regions from wikidata ETA 15s"
regionList = self.query(msg, query.query, limit=limit)
return regionList
[docs]
def getCities(self, limit=1000000):
"""
get all human settlements as list of dict with duplicates for label, region, country ...
"""
query = self.qm.queriesByName.get("Cities")
if not query:
raise ValueError("Cities query not found in queries.yaml")
msg = "Getting cities (human settlements) from wikidata ETA 50 s"
citiesList = self.query(msg, query.query, limit=limit)
return citiesList
[docs]
def getCitiesForRegion(self, regionId, msg):
"""
get the cities for the given Region
"""
regionPath = (
"?region ^wdt:P131/^wdt:P131/^wdt:P131 ?cityQ."
if regionId in ["Q980", "Q21"]
else "?cityQ wdt:P131* ?region."
)
queryString = """# get cities by region for geograpy3
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX wdt: <http://www.wikidata.org/prop/direct/>
PREFIX wd: <http://www.wikidata.org/entity/>
SELECT distinct (?cityQ as ?wikidataid) ?name ?geoNameId ?gndId ?regionId ?countryId ?pop ?coord WHERE {
VALUES ?hsType {
wd:Q1549591 wd:Q3957 wd:Q5119 wd:Q15284 wd:Q62049 wd:Q515 wd:Q1637706 wd:Q1093829 wd:Q486972 wd:Q532
}
VALUES ?region {
wd:%s
}
# region the city should be in
%s
# type of human settlement to try
?hsType ^wdt:P279*/^wdt:P31 ?cityQ.
# label of the City
?cityQ rdfs:label ?name filter (lang(?name) = "en").
# geoName Identifier
OPTIONAL {
?cityQ wdt:P1566 ?geoNameId.
}
# GND-ID
OPTIONAL {
?cityQ wdt:P227 ?gndId.
}
OPTIONAL{
?cityQ wdt:P625 ?coord .
}
# region this city belongs to
OPTIONAL {
?cityQ wdt:P131 ?regionId .
}
OPTIONAL {
?cityQ wdt:P1082 ?pop
}
# country this city belongs to
OPTIONAL {
?cityQ wdt:P17 ?countryId .
}
}""" % (
regionId,
regionPath,
)
regionCities = self.query(msg, queryString)
return regionCities
[docs]
def getCityStates(self, limit=None):
"""
get city states from Wikidata
"""
query = self.qm.queriesByName.get("CityStates")
if not query:
raise ValueError("CityStates query not found in queries.yaml")
msg = "Getting regions from wikidata ETA 15s"
cityStateList = self.query(msg, query.query, limit=limit)
return cityStateList
[docs]
@staticmethod
def getCoordinateComponents(coordinate: str) -> (float, float):
"""
Converts the wikidata coordinate representation into its subcomponents longitude and latitude
Example: 'Point(-118.25 35.05694444)' results in ('-118.25' '35.05694444')
Args:
coordinate: coordinate value in the format as returned by wikidata queries
Returns:
Returns the longitude and latitude of the given coordinate as separate values
"""
# https://stackoverflow.com/a/18237992/1497139
floatRegex = r"[-+]?\d+([.,]\d*)?"
regexp = rf"Point\((?P<lon>{floatRegex})\s+(?P<lat>{floatRegex})\)"
cMatch = None
if coordinate:
try:
cMatch = re.search(regexp, coordinate)
except Exception as ex:
# ignore
pass
if cMatch:
latStr = cMatch.group("lat")
lonStr = cMatch.group("lon")
lat, lon = float(latStr.replace(",", ".")), float(lonStr.replace(",", "."))
if lon > 180:
lon = lon - 360
return lat, lon
else:
# coordinate does not have the expected format
return None, None
[docs]
@staticmethod
def getWikidataId(wikidataURL: str):
"""
Extracts the wikidata id from the given wikidata URL
Args:
wikidataURL: wikidata URL the id should be extracted from
Returns:
The wikidata id if present in the given wikidata URL otherwise None
"""
# regex pattern taken from https://www.wikidata.org/wiki/Q43649390 and extended to also support property ids
wikidataidMatch = re.search(r"[PQ][1-9]\d*", wikidataURL)
if wikidataidMatch and wikidataidMatch.group(0):
wikidataid = wikidataidMatch.group(0)
return wikidataid
else:
return None
[docs]
@staticmethod
def getValuesClause(varName: str, values, wikidataEntities: bool = True):
"""
generates the SPARQL value clause for the given variable name containing the given values
Args:
varName: variable name for the ValuesClause
values: values for the clause
wikidataEntities(bool): if true the wikidata prefix is added to the values otherwise it is expected taht the given values are proper IRIs
Returns:
str
"""
clauseValues = ""
if isinstance(values, list):
for value in values:
if wikidataEntities:
clauseValues += f"wd:{value} "
else:
clauseValues += f"{value} "
else:
if wikidataEntities:
clauseValues = f"wd:{values} "
else:
clauseValues = f"{values} "
clause = "VALUES ?%s { %s }" % (varName, clauseValues)
return clause