import csv
import sqlite3
import zipfile
from functools import lru_cache, wraps
from importlib.resources import as_file, files
from io import TextIOWrapper
import pandas as pd
[docs]
def cache(func):
@wraps(func) # Preserves the original function's metadata (docstring, name, etc.)
@lru_cache(maxsize=512) # Apply LRU cache to the wrapper
def wrapper(*args, **kwargs): # Accepts all arguments passed to the original function
return func(*args, **kwargs) # Call the original function with the arguments
return wrapper
[docs]
class CBEOpenData:
"""
A class to interact with CBE (Crossroads Bank for Enterprises) Open Data files.
This class provides methods to load, query, and analyze CBE Open Data, which contains information about
enterprises, establishments, and their associated data (e.g., addresses, activities, contacts, etc.).
The data is typically provided in a ZIP file containing CSV files, which are loaded into an SQLite database
for efficient querying.
The class supports:
- Loading data from a local ZIP file.
- Querying enterprise details, denominations, addresses, activities, contacts, and more.
- Replacing code-based fields with their human-readable descriptions using CBE code tables.
- Mimicking the data structure available on the CBE Public Search website.
Links:
- CBE Open Data Portal: https://kbopub.economie.fgov.be/kbo-open-data/
- CBE Public Search: https://kbopub.economie.fgov.be/kbopub/zoeknummerform.html
- CBE Extra Info: https://economie.fgov.be/nl/themas/ondernemingen/kruispuntbank-van/diensten-voor-iedereen/hergebruik-van-publieke
Attributes:
zip_path (str): Path to the ZIP file containing CBE Open Data.
db_path (str): Path to the SQLite database (defaults to in-memory database).
schema_path (str): Path to the SQL schema file for creating database tables.
force_load (bool): Whether to force reloading of data even if it already exists in the database.
db_conn (sqlite3.Connection): Connection to the SQLite database.
zip_file (zipfile.ZipFile): Handle to the opened ZIP file.
Methods:
__init__: Initializes the CBEOpenData instance and loads data into the database.
query: Executes a generic SQL query on the database.
get_code: Fetches a code table entry for a given category, code, and language.
get_denominations: Retrieves denominations for a given entity.
get_addresses: Retrieves addresses for a given entity.
get_contacts: Retrieves contacts for a given entity.
get_activities: Retrieves activities for a given entity.
get_branches: Retrieves branches for a given enterprise.
get_establishments: Retrieves establishments for a given enterprise.
get_enterprise: Retrieves enterprise details with code-based fields replaced by descriptions.
search: Retrieves all information tied to an enterprise number, mimicking the CBE Public Search website.
Example:
>>> cbe_open_data = CBEOpenData(
zip_path='KboOpenData_0133_2025_03_Full.zip',
db_path='cbe_open_data.db',
force_load=True
)
>>> enterprise_info = cbe_open_data.search("1234.567.890")
>>> print(enterprise_info)
Notes:
- The class uses an LRU cache to optimize repeated queries.
- Ensure the schema file is correctly configured to match the CSV file structure.
"""
REQUIRED_FILES = {
"activity.csv",
"address.csv",
"branch.csv",
"code.csv",
"contact.csv",
"denomination.csv",
"enterprise.csv",
"establishment.csv",
"meta.csv",
}
[docs]
def __init__(self, zip_path: str, db_path: str = ":memory:", force_load: bool = False, schema_path: str = None):
"""
Initialize the CBEOpenData instance.
Args:
zip_path (str): Path to the ZIP file.
db_path (str, optional): Path to the SQLite database. Defaults to `:memory:`.
force_load (bool, optional): Whether to force data reload. Defaults to `False`.
schema_path (str, optional): Path to the SQL schema file. Defaults to the packaged `schema.sql`.
"""
self.zip_path = zip_path
self.zip_file = zipfile.ZipFile(self.zip_path, "r")
self.db_path = db_path
self.force_load = force_load
self.schema_path = schema_path or self._get_default_schema_path()
self.db_conn = sqlite3.connect(self.db_path)
self.db_conn.row_factory = sqlite3.Row # Enable row access by column name
self.db_conn.execute("PRAGMA journal_mode = OFF;")
self.db_conn.execute("PRAGMA synchronous = 0;")
self.db_conn.execute("PRAGMA locking_mode = EXCLUSIVE;")
self.db_conn.execute("PRAGMA temp_store = MEMORY;")
# Validate required files
zip_contents = set(self.zip_file.namelist())
missing_files = self.REQUIRED_FILES - zip_contents
if missing_files:
raise FileNotFoundError(f"Missing required files in ZIP: {', '.join(missing_files)}")
with self.zip_file.open("meta.csv") as meta_file:
reader = csv.DictReader(TextIOWrapper(meta_file, encoding="utf-8"))
# Find ExtractNumber
self.extract_number = -1 # Default if not found
for row in reader:
if row.get("Variable") == "ExtractNumber":
self.extract_number = int(row["Value"])
break # No need to read further
# Check if the meta table exists and if the extract number matches
if not self._should_load_data():
print("Warning: Skipping data loading because the extract number matches and force_load=False.")
print("Info: To force a reload of the data, set force_load=True.")
return
self._load_schema()
# Load CSV data from the ZIP file
for csv_name in self.zip_file.namelist():
if csv_name.endswith(".csv"):
print(f"Loading {csv_name}...")
self._load_csv_file(csv_name)
def _get_default_schema_path(self) -> str:
"""Get the default schema path from the package."""
schema_resource = files("cbe").joinpath("schema.sql")
with as_file(schema_resource) as schema_file:
return str(schema_file)
def _should_load_data(self) -> bool:
"""
Check if the data should be loaded based on the extract number and force_load flag.
Returns:
bool: True if data should be loaded, False otherwise.
"""
cursor = self.db_conn.cursor()
# Check if the meta table exists
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='meta';")
meta_table_exists = cursor.fetchone() is not None
if not meta_table_exists:
print("Warning: The meta table does not exist. Data will be loaded.")
return True
# Get the current extract number from the meta table
cursor.execute("SELECT Value FROM meta WHERE Variable = 'ExtractNumber';")
row = cursor.fetchone()
if not row:
print("Warning: No extract number found in the meta table. Data will be loaded.")
return True
current_extract_number = int(row["Value"])
# Check if the extract number matches
if self.extract_number is not None and current_extract_number == self.extract_number:
# Matched but check if force_load is True
if self.force_load:
print("Warning: Extract number matches, but force_load is True. Data will be reloaded.")
return True
else:
return False # Skip loading
else:
print(
(
"Warning: Extract number mismatch. "
f"Current: {current_extract_number}, Requested: {self.extract_number}. "
"Data will be loaded."
)
)
return True
def _load_schema(self) -> None:
"""Load SQL statements from a schema file and execute them to create tables and indexes."""
with open(self.schema_path, "r", encoding="utf-8") as file:
sql_script = file.read()
print("Creating SQL tables")
with self.db_conn:
self.db_conn.executescript(sql_script)
def _load_csv_file(self, csv_name: str) -> None:
"""Load CSV file into the respective table."""
table_name = csv_name.split(".")[0]
# Read the CSV file in chunks to avoid loading the entire file into memory
chunk_iter = pd.read_csv(
self.zip_file.open(csv_name),
chunksize=1000000, # Adjust chunk size based on memory constraints
dtype=str, # Ensure all data is treated as strings
keep_default_na=False, # Avoid interpreting empty strings as NaN
)
rows_affected = 0
# Process each chunk
for chunk in chunk_iter:
# Append to the existing table
rows_affected += chunk.to_sql(
name=table_name,
con=self.db_conn,
if_exists="append",
index=False, # Don't write row indices to the database
)
print(f"Successfully loaded {rows_affected} rows from {csv_name} into {table_name}")
def __del__(self) -> None:
"""Clean up resources when instance is destroyed."""
print("Cleaning up resources...")
if hasattr(self, "db_conn") and self.db_conn:
print("Closing database connection")
self.db_conn.close()
if hasattr(self, "zip_file") and self.zip_file:
print("Closing ZIP file")
self.zip_file.close()
[docs]
@cache
def query(self, query: str, params: tuple = ()) -> list[sqlite3.Row]:
"""Generic query method."""
cursor = self.db_conn.cursor()
cursor.execute(query, params)
return cursor.fetchall()
[docs]
@cache
def get_code(self, category: str, code: str, language: str = "NL") -> dict:
"""
Fetch the code table entry for a given category, code, and language.
Args:
category (str): The category of the code (e.g., "Status", "JuridicalSituation").
code (str): The code to look up.
language (str): The language of the description (default is "NL" for Dutch).
Returns:
dict: A dictionary containing the code, language, and description.
"""
cursor = self.db_conn.cursor()
cursor.execute(
"SELECT Code, Language, Description FROM code WHERE Category = ? AND Code = ? AND Language = ?;", (category, code, language)
)
row = cursor.fetchone()
return dict(row) if row else None
[docs]
@cache
def get_denominations(self, entity_number: str, language: str = "NL") -> list:
"""
Retrieve denominations for a given entity (enterprise or establishment).
Args:
entity_number (str): The entity number (enterprise or establishment number).
language (str): The language for code table descriptions.
Returns:
list: A list of denominations with code table lookups applied.
"""
cursor = self.db_conn.cursor()
cursor.execute("SELECT * FROM denomination WHERE EntityNumber = ?;", (entity_number,))
denominations = []
for row in cursor.fetchall():
row = dict(row)
row["Language"] = self.get_code("Language", row["Language"], language)
row["TypeOfDenomination"] = self.get_code("TypeOfDenomination", row["TypeOfDenomination"], language)
denominations.append(row)
return denominations
[docs]
@cache
def get_addresses(self, entity_number: str, language: str = "NL") -> list:
"""
Retrieve addresses for a given entity (enterprise or establishment).
Args:
entity_number (str): The entity number (enterprise or establishment number).
language (str): The language for code table descriptions.
Returns:
list: A list of addresses with code table lookups applied.
"""
cursor = self.db_conn.cursor()
cursor.execute("SELECT * FROM address WHERE EntityNumber = ?;", (entity_number,))
addresses = []
for row in cursor.fetchall():
row = dict(row)
row["TypeOfAddress"] = self.get_code("TypeOfAddress", row["TypeOfAddress"], language)
addresses.append(row)
return addresses
[docs]
@cache
def get_activities(self, entity_number: str, language: str = "NL") -> list:
"""
Retrieve activities for a given entity (enterprise or establishment).
Args:
entity_number (str): The entity number (enterprise or establishment number).
language (str): The language for code table descriptions.
Returns:
list: A list of activities with code table lookups applied.
"""
cursor = self.db_conn.cursor()
cursor.execute("SELECT * FROM activity WHERE EntityNumber = ?;", (entity_number,))
activities = []
for row in cursor.fetchall():
row = dict(row)
row["ActivityGroup"] = self.get_code("ActivityGroup", row["ActivityGroup"], language)
row["Classification"] = self.get_code("Classification", row["Classification"], language)
nace_version = row["NaceVersion"]
nace_code = row["NaceCode"]
if nace_version == "2003":
row["NaceCode"] = self.get_code("Nace2003", nace_code, language)
elif nace_version == "2008":
row["NaceCode"] = self.get_code("Nace2008", nace_code, language)
elif nace_version == "2025":
row["NaceCode"] = self.get_code("Nace2025", nace_code, language)
activities.append(row)
return activities
[docs]
@cache
def get_branches(self, enterprise_number: str) -> list:
"""
Retrieve branches for a given enterprise.
Args:
enterprise_number (str): The enterprise number in the format 'XXXX.XXX.XXX'.
Returns:
list: A list of branches.
"""
cursor = self.db_conn.cursor()
cursor.execute("SELECT * FROM branch WHERE EnterpriseNumber = ?;", (enterprise_number,))
return [dict(row) for row in cursor.fetchall()]
[docs]
@cache
def get_establishments(self, enterprise_number: str, language: str = "NL") -> list:
"""
Retrieve establishments for a given enterprise, including their addresses and denominations.
Args:
enterprise_number (str): The enterprise number in the format 'XXXX.XXX.XXX'.
language (str): The language for code table descriptions.
Returns:
list: A list of establishments with their addresses and denominations.
"""
cursor = self.db_conn.cursor()
cursor.execute("SELECT * FROM establishment WHERE EnterpriseNumber = ?;", (enterprise_number,))
establishments = []
for row in cursor.fetchall():
establishment = dict(row)
establishment["addresses"] = self.get_addresses(establishment["EstablishmentNumber"], language)
establishment["denominations"] = self.get_denominations(establishment["EstablishmentNumber"], language)
establishment["contacts"] = self.get_contacts(establishment["EstablishmentNumber"], language)
establishments.append(establishment)
return establishments
[docs]
@cache
def get_enterprise(self, enterprise_number: str, language: str = "NL") -> dict:
"""
Retrieve enterprise details and replace code-based fields with their descriptions.
Args:
enterprise_number (str): The enterprise number in the format 'XXXX.XXX.XXX'.
language (str): The language for code table descriptions.
Returns:
dict: A dictionary containing enterprise details with code table lookups applied.
"""
cursor = self.db_conn.cursor()
cursor.execute("SELECT * FROM enterprise WHERE EnterpriseNumber = ?;", (enterprise_number,))
enterprise = cursor.fetchone()
if enterprise:
enterprise = dict(enterprise)
enterprise["Status"] = self.get_code("Status", enterprise["Status"], language)
enterprise["JuridicalSituation"] = self.get_code("JuridicalSituation", enterprise["JuridicalSituation"], language)
enterprise["TypeOfEnterprise"] = self.get_code("TypeOfEnterprise", enterprise["TypeOfEnterprise"], language)
if enterprise["JuridicalForm"]:
enterprise["JuridicalForm"] = self.get_code("JuridicalForm", enterprise["JuridicalForm"], language)
return enterprise
[docs]
@cache
def search(self, enterprise_number: str, language: str = "NL") -> dict:
"""
Retrieve all information tied to an enterprise number, including enterprise details, denominations, addresses,
establishments, contacts, activities, and branches. This function mimics the data structure and details
available on the CBE Public Search website (https://kbopub.economie.fgov.be/kbopub/zoeknummerform.html).
The function replaces code-based fields (e.g., Status, JuridicalSituation, TypeOfEnterprise) with their
corresponding descriptions from the CBE code tables, based on the specified language.
Args:
enterprise_number (str): The enterprise number in the format 'XXXX.XXX.XXX'.
language (str): The language for code table descriptions (e.g., "NL", "FR", "DE", "EN").
Returns:
dict: A dictionary containing all information tied to the enterprise, with code table lookups applied.
"""
enterprise_data = self.get_enterprise(enterprise_number, language)
if not enterprise_data:
raise ValueError(f"Enterprise '{enterprise_number}' not found.")
enterprise_data.update(
{
"EnterpriseNumber": enterprise_number,
"denominations": self.get_denominations(enterprise_number, language),
"addresses": self.get_addresses(enterprise_number, language),
"establishments": self.get_establishments(enterprise_number, language),
"contacts": self.get_contacts(enterprise_number, language),
"activities": self.get_activities(enterprise_number, language),
"branches": self.get_branches(enterprise_number),
}
)
return enterprise_data