Files
BasesCGL/export_lignees_to_gedcom.py
T
yann64 f75cbebb44 Initial commit: GEDCOM export scripts and generated filiations
Includes export_lignees_to_gedcom.py (Drupal book → GEDCOM 5.5.1),
export_users_to_webtrees.py, generated GEDCOM files for 16 family
lineages, and webtrees user import SQL. Excludes basesgen.sql (966 MB)
and webtrees_temp_passwords.csv (sensitive).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-28 13:44:28 +02:00

1691 lines
61 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Drupal 6 'lignées familiales' → GEDCOM 5.5.1 exporter.
Reads basesgen.sql (drupal_node + drupal_node_revisions, type='book')
and writes one .ged file per family under ./gedcom_output/.
Persons are identified by their hierarchical ID (e.g. "1.4.4b.1"):
- dots separate parent→child relationships
- a letter suffix on the last component (a/b/c…) identifies which
union of the parent produced this child
Usage:
python3 export_lignees_to_gedcom.py
"""
import re
import sys
import html
from pathlib import Path
from bs4 import BeautifulSoup
# ── Configuration ─────────────────────────────────────────────────────────────
SQL_FILE = Path("/home/yann64/BaseCGL/basesgen.sql")
OUT_DIR = Path("/home/yann64/BaseCGL/gedcom_output")
SOURCE_STR = "CGL Bases généalogiques du Languedoc basesgen.sql"
# ── French place hierarchy ────────────────────────────────────────────────────
# Maps département name → région name (post-2016 reform, mainland + DOM).
# Keys are title-cased; lookup is case-insensitive (see _expand_place).
_DEPT_TO_REGION: dict[str, str] = {
# Auvergne-Rhône-Alpes
"Ain": "Auvergne-Rhône-Alpes",
"Allier": "Auvergne-Rhône-Alpes",
"Ardèche": "Auvergne-Rhône-Alpes",
"Cantal": "Auvergne-Rhône-Alpes",
"Drôme": "Auvergne-Rhône-Alpes",
"Isère": "Auvergne-Rhône-Alpes",
"Loire": "Auvergne-Rhône-Alpes",
"Haute-Loire": "Auvergne-Rhône-Alpes",
"Puy-de-Dôme": "Auvergne-Rhône-Alpes",
"Rhône": "Auvergne-Rhône-Alpes",
"Savoie": "Auvergne-Rhône-Alpes",
"Haute-Savoie": "Auvergne-Rhône-Alpes",
# Bourgogne-Franche-Comté
"Côte-d'Or": "Bourgogne-Franche-Comté",
"Doubs": "Bourgogne-Franche-Comté",
"Jura": "Bourgogne-Franche-Comté",
"Nièvre": "Bourgogne-Franche-Comté",
"Haute-Saône": "Bourgogne-Franche-Comté",
"Saône-et-Loire": "Bourgogne-Franche-Comté",
"Yonne": "Bourgogne-Franche-Comté",
"Territoire de Belfort": "Bourgogne-Franche-Comté",
# Bretagne
"Côtes-d'Armor": "Bretagne",
"Finistère": "Bretagne",
"Ille-et-Vilaine": "Bretagne",
"Morbihan": "Bretagne",
# Centre-Val de Loire
"Cher": "Centre-Val de Loire",
"Eure-et-Loir": "Centre-Val de Loire",
"Indre": "Centre-Val de Loire",
"Indre-et-Loire": "Centre-Val de Loire",
"Loir-et-Cher": "Centre-Val de Loire",
"Loiret": "Centre-Val de Loire",
# Corse
"Corse-du-Sud": "Corse",
"Haute-Corse": "Corse",
# Grand Est
"Ardennes": "Grand Est",
"Aube": "Grand Est",
"Marne": "Grand Est",
"Haute-Marne": "Grand Est",
"Meurthe-et-Moselle": "Grand Est",
"Meuse": "Grand Est",
"Moselle": "Grand Est",
"Bas-Rhin": "Grand Est",
"Haut-Rhin": "Grand Est",
"Vosges": "Grand Est",
# Hauts-de-France
"Aisne": "Hauts-de-France",
"Nord": "Hauts-de-France",
"Oise": "Hauts-de-France",
"Pas-de-Calais": "Hauts-de-France",
"Somme": "Hauts-de-France",
# Île-de-France
"Paris": "Île-de-France",
"Ville-de-Paris": "Île-de-France",
"Seine-et-Marne": "Île-de-France",
"Yvelines": "Île-de-France",
"Essonne": "Île-de-France",
"Hauts-de-Seine": "Île-de-France",
"Seine-Saint-Denis": "Île-de-France",
"Val-de-Marne": "Île-de-France",
"Val-d'Oise": "Île-de-France",
# Normandie
"Calvados": "Normandie",
"Eure": "Normandie",
"Manche": "Normandie",
"Orne": "Normandie",
"Seine-Maritime": "Normandie",
# Nouvelle-Aquitaine
"Charente": "Nouvelle-Aquitaine",
"Charente-Maritime": "Nouvelle-Aquitaine",
"Corrèze": "Nouvelle-Aquitaine",
"Creuse": "Nouvelle-Aquitaine",
"Dordogne": "Nouvelle-Aquitaine",
"Gironde": "Nouvelle-Aquitaine",
"Landes": "Nouvelle-Aquitaine",
"Lot-et-Garonne": "Nouvelle-Aquitaine",
"Pyrénées-Atlantiques": "Nouvelle-Aquitaine",
"Deux-Sèvres": "Nouvelle-Aquitaine",
"Vienne": "Nouvelle-Aquitaine",
"Haute-Vienne": "Nouvelle-Aquitaine",
# Occitanie
"Ariège": "Occitanie",
"Aude": "Occitanie",
"Aveyron": "Occitanie",
"Gard": "Occitanie",
"Haute-Garonne": "Occitanie",
"Gers": "Occitanie",
"Hérault": "Occitanie",
"Lot": "Occitanie",
"Lozère": "Occitanie",
"Hautes-Pyrénées": "Occitanie",
"Pyrénées-Orientales": "Occitanie",
"Tarn": "Occitanie",
"Tarn-et-Garonne": "Occitanie",
# Pays de la Loire
"Loire-Atlantique": "Pays de la Loire",
"Maine-et-Loire": "Pays de la Loire",
"Mayenne": "Pays de la Loire",
"Sarthe": "Pays de la Loire",
"Vendée": "Pays de la Loire",
# Provence-Alpes-Côte d'Azur
"Alpes-de-Haute-Provence": "Provence-Alpes-Côte d'Azur",
"Hautes-Alpes": "Provence-Alpes-Côte d'Azur",
"Alpes-Maritimes": "Provence-Alpes-Côte d'Azur",
"Bouches-du-Rhône": "Provence-Alpes-Côte d'Azur",
"Var": "Provence-Alpes-Côte d'Azur",
"Vaucluse": "Provence-Alpes-Côte d'Azur",
# DOM
"Guadeloupe": "Guadeloupe",
"Martinique": "Martinique",
"Guyane": "Guyane",
"La Réunion": "La Réunion",
"Mayotte": "Mayotte",
# Historical names (pre-reform or pre-1969)
"Basses-Pyrénées": "Nouvelle-Aquitaine", # now Pyrénées-Atlantiques
"Basses Pyrénées": "Nouvelle-Aquitaine",
"Seine": "Île-de-France", # dissolved in 1968
"Seine-et-Oise": "Île-de-France", # dissolved in 1968
}
# Case-insensitive lookup index
_DEPT_LOWER: dict[str, str] = {k.lower(): v for k, v in _DEPT_TO_REGION.items()}
# Canonical name index (for display — preserves original casing)
_DEPT_CANONICAL: dict[str, str] = {k.lower(): k for k in _DEPT_TO_REGION}
# Matches a time prefix with a city following: "12 h 00 à City" / "15h au Mas"
_RE_TIME_WITH_CITY = re.compile(
r"^(\d+)\s*h(?:\s*(\d+))?\s+(?:aux?|[àa])\s+(.+)$", re.I)
# Matches a pure time string with no city
_RE_TIME_ONLY = re.compile(r"^(\d+)\s*h(?:\s*(\d+))?\s*$", re.I)
def _split_place(raw: str) -> tuple[str, str]:
"""
Split a raw place string into (gedcom_time, place_without_prefix).
Returns:
- ('HH:MM', 'City (Dept)') for '12 h 00 à City (Dept)'
- ('HH:MM', '') for '2 h 30' (pure time, no city)
- ('', raw) when no time prefix is found
"""
if not raw:
return "", ""
m = _RE_TIME_WITH_CITY.match(raw)
if m:
hours, minutes = int(m.group(1)), int(m.group(2) or 0)
return f"{hours:02d}:{minutes:02d}", m.group(3).strip()
m = _RE_TIME_ONLY.match(raw)
if m:
hours, minutes = int(m.group(1)), int(m.group(2) or 0)
return f"{hours:02d}:{minutes:02d}", ""
return "", raw
def _expand_place(place: str) -> str:
"""
Convert 'Montpellier (Hérault)''Montpellier, Hérault, Occitanie, France'.
'Camarade (Ariège) - Machicot''Machicot, Camarade, Ariège, Occitanie, France'.
Call _split_place() first to strip any time prefix before passing here.
"""
if not place:
return ""
m = re.search(r"^(.*?)\s*\(([^)]+)\)\s*(?:-\s*(.+))?$", place)
if not m:
return place # no parenthetical département — return as-is
city = m.group(1).strip()
dept_raw = m.group(2).strip()
subdivision = m.group(3).strip() if m.group(3) else ""
dept_key = dept_raw.lower()
region = _DEPT_LOWER.get(dept_key, "")
dept_display = _DEPT_CANONICAL.get(dept_key, dept_raw)
parts = []
if subdivision:
parts.append(subdivision)
parts.append(city)
parts.append(dept_display)
if region:
parts.append(region)
parts.append("France")
return ", ".join(parts)
# ── Marriage contract / source parsing ───────────────────────────────────────
_RE_CONTRACT_SOURCE = re.compile(
r"^(contrat\s+de\s+mariage\s+.+?)\s+-\s+(.+?)\s+-\s+(.+)$", re.I)
_RE_FOLIO = re.compile(r"\b(folio\s+\S+(?:\s+\S+)*)\s*$", re.I)
def _parse_contract_source(text: str) -> dict:
"""
Parse 'Contrat de Mariage chez Maître X - Dépôt - Cote folio N'.
Returns {"title", "depot", "caln", "page"} or {}.
"""
m = _RE_CONTRACT_SOURCE.match(text.strip())
if not m:
return {}
title = m.group(1).strip()
depot = m.group(2).strip()
cote_raw = m.group(3).strip()
fm = _RE_FOLIO.search(cote_raw)
if fm:
page = fm.group(1)
caln = cote_raw[:fm.start()].strip()
else:
page = ""
caln = cote_raw
return {"title": title, "depot": depot, "caln": caln, "page": page}
# ── Spouse context parsing ─────────────────────────────────────────────────────
_RE_SPOUSE_CONTEXT = re.compile(
r"s[''`]unit\s+avec\s+(.+?)(?=\.\s+(?:Ce couple|Ils\s+se|Le couple)|\.?\s*$)",
re.I | re.S)
_RE_SPOUSE_LIFE = re.compile(
r"\((~?\d{4})\s*(?:-?>?\s*(~?\d{4}))?\)", re.I)
def _parse_spouse_context(full_text: str) -> dict:
"""
Extract name, birth, death, occupation from 's'unit avec ...' sentence.
Returns {"name", "birth", "death", "occu"}.
"""
m = _RE_SPOUSE_CONTEXT.search(full_text)
if not m:
return {}
ctx = m.group(1).strip()
# Name: up to first ( or ,
nm = re.match(r"([^(,]+)", ctx)
name = nm.group(1).strip() if nm else ctx.split(",")[0].strip()
# Dates from parenthetical (birth->death or ~birth)
birth = death = ""
dm = _RE_SPOUSE_LIFE.search(ctx)
if dm:
b_raw = dm.group(1)
b_year = b_raw.lstrip("~")
birth = f"ABT {b_year}" if b_raw.startswith("~") else b_year
if dm.group(2):
d_raw = dm.group(2)
d_year = d_raw.lstrip("~")
death = f"ABT {d_year}" if d_raw.startswith("~") else d_year
# Occupation: text after dates (or after name) before "le fils/la fille/les enfants"
occu = ""
after = ctx[dm.end():].strip() if dm else ctx[len(name):].strip()
after = after.lstrip(",").strip()
om = re.match(r"([^,(]+?)(?=\s*,\s*(?:le|la|les)\s+(?:fils|fille|enfant)|$)", after, re.I)
if om:
candidate = om.group(1).strip().rstrip(".")
# Only keep genuine occupations — reject parentage/family descriptions
if (candidate and len(candidate) < 60
and not re.search(r"\best\b|\bsont\b|\bfille\b|\bfils\b|\benfant\b|\bparents\b", candidate, re.I)):
occu = candidate
return {"name": name, "birth": birth, "death": death, "occu": occu}
# ── Gray paragraph grouping (witnesses, godparents, notes) ────────────────────
# Headers that introduce a group of witness lines
_RE_WITNESS_HDR = re.compile(
r"^(Présents?|Témoins?|Déclarants?)\s*:?\s*$", re.I)
_RE_MARR_WITNESS_HDR = re.compile(
r"^(Témoins?\s+au\s+mariage|Présents?\s+au\s+contrat|"
r"Présents?\s+à\s+la\s+célébration|Présents?\s+au\s+mariage)\b", re.I)
_RE_DEAT_WITNESS_HDR = re.compile(
r"^(Témoins?\s+au\s+décès|Présents?\s+au\s+décès)\b", re.I)
# Lines that are part of a witness list
_RE_WITNESS_ITEM = re.compile(r"^[\-–•]\s+\S", re.I)
# Single-line godparent references
_RE_GODPARENT = re.compile(r"^(Parrain|Marraine)\s*[:;]?\s+\S", re.I)
# Archive source references in gray paragraphs
_RE_GRAY_ARCHIVE = re.compile(
r"^Archives\s+d[eé]\w*\s+.+?(?:\s+-\s+|\s*:\s*)Registre\b", re.I)
# Marriage-specific gray notes (contract, publications, dispensation, etc.)
_RE_GRAY_MARR_NOTE = re.compile(
r"^(?:"
r"(?:Date\s+du\s+|(?:Un|Il\s+(?:existe\s+un|a\s+été\s+fait\s+un))\s+)?"
r"Contrat\s+de\s+[Mm]ariage\b"
r"|Contrat\s+passé\s+(?:chez|devant|par)\b"
r"|Accord\s+(?:chez|devant|par)\s+Ma[iî]tre\b"
r"|Acte\s+respectueux\b"
r"|L[''']acte\s+de\s+mariage\b"
r"|Publications?\s+de\s+[Mm]ariage\b"
r"|Dispense\s+(?:de|au|du)\b"
r"|La\s+mariée?\s+dit\b"
r"|Le\s+marié?\s+dit\b"
r")", re.I)
def _group_gray_notes(all_paras: list[dict]) -> dict:
"""
Scan paragraphs in order and group gray ones by event association.
Returns {
"birth_notes": list of multiline strings (for BIRT event NOTE)
"death_notes": list of multiline strings (for DEAT event NOTE)
"marriage_notes": list of multiline strings (for MARR event NOTE)
"general_notes": list of single-line strings (for INDI NOTE)
}
"""
result: dict[str, list] = {
"birth_notes": [], "death_notes": [],
"marriage_notes": [], "general_notes": [],
}
# Track which events have been encountered (determines gray-note assignment)
seen_marr = False
seen_deat = False
current_group: list[str] = [] # lines of the current witness group
current_key: str = "" # "birth_notes" | "death_notes" | "marriage_notes"
def flush():
nonlocal current_group, current_key
if current_group and current_key:
result[current_key].append("\n".join(current_group))
current_group = []
current_key = ""
for para in all_paras:
color = para["color"]
text = para["text"].strip()
if not text:
continue
# ── Non-gray paragraphs: update event context and flush ──
if color != "gray":
flush()
if re.search(r"\bse marient\b|contrat de mariage\s+le\b", text, re.I):
seen_marr = True
if re.search(r"\best décédé\b|\bmeurt le\b", text, re.I):
seen_deat = True
continue
# ── Gray paragraph ──
# Determine which event key to use for ungrouped gray items
def current_event_key() -> str:
if seen_deat:
return "death_notes"
if seen_marr:
return "marriage_notes"
return "birth_notes"
# 1a. Marriage-specific notes (contracts, publications, dispensations…)
if _RE_GRAY_MARR_NOTE.match(text):
flush()
if len(text) > 10:
result["marriage_notes"].append(text)
continue
# 1b. Archive/registry references → contextual event bucket
if _RE_GRAY_ARCHIVE.match(text):
flush()
if len(text) > 10:
result[current_event_key()].append(text)
continue
# 2. Explicit death-witness header
if _RE_DEAT_WITNESS_HDR.match(text):
flush()
current_key = "death_notes"
current_group = [text]
continue
# 3. Explicit marriage-witness header
if _RE_MARR_WITNESS_HDR.match(text):
flush()
current_key = "marriage_notes"
current_group = [text]
continue
# 4. Generic witness header ("Présents :", "Témoins :", "Déclarants :")
if _RE_WITNESS_HDR.match(text):
flush()
current_key = current_event_key()
current_group = [text]
continue
# 5. Witness item line (starts with "-") — append to current group or start one
if _RE_WITNESS_ITEM.match(text):
if current_key:
current_group.append(text)
else:
# Orphan item (no preceding header) — start implicit group
current_key = current_event_key()
current_group = [text]
continue
# 6. Single-line godparent reference → birth note
if _RE_GODPARENT.match(text):
flush()
if len(text) > 5:
result["birth_notes"].append(text)
continue
# 7. Everything else → flush any group, then route by current event context
flush()
if len(text) > 10:
result[current_event_key()].append(text)
flush()
return result
def _emit_note_block(lines: list[str], base_level: int) -> list[str]:
"""
Emit a multiline note as GEDCOM NOTE + CONT lines.
base_level: level of the NOTE line (2 for event-level, 1 for INDI-level).
"""
out = []
note_lines = lines
if len(note_lines) == 1:
out.append(gedcom_line(base_level, "NOTE", note_lines[0][:248]))
else:
out.append(gedcom_line(base_level, "NOTE", note_lines[0][:248]))
for continuation in note_lines[1:]:
out.append(gedcom_line(base_level + 1, "CONT", continuation[:248]))
return out
def _emit_grouped_notes(note_strings: list[str], base_level: int) -> list[str]:
"""Emit a list of note strings (each possibly multiline) as GEDCOM NOTE blocks."""
out = []
for ns in note_strings:
out.extend(_emit_note_block(ns.split("\n"), base_level))
return out
# ── French calendar helpers ───────────────────────────────────────────────────
MOIS_FR = {
"janvier": "JAN", "février": "FEB", "fevrier": "FEB",
"mars": "MAR", "avril": "APR", "mai": "MAY", "juin": "JUN",
"juillet": "JUL", "août": "AUG", "aout": "AUG",
"septembre": "SEP", "octobre": "OCT", "novembre": "NOV", "décembre": "DEC",
"decembre": "DEC",
}
MOIS_PAT = "|".join(MOIS_FR.keys())
JOURS_PAT = "lundi|mardi|mercredi|jeudi|vendredi|samedi|dimanche"
def fr_date(day: str | None, month_fr: str | None, year: str | None,
prefix: str = "") -> str:
"""Convert French date parts to GEDCOM date string."""
parts = []
if prefix:
parts.append(prefix)
if day and day not in ("0", ""):
parts.append(str(int(day)))
if month_fr:
m = MOIS_FR.get(month_fr.lower().strip())
if m:
parts.append(m)
if year:
parts.append(year)
return " ".join(parts) if parts else ""
# ── Regex patterns ────────────────────────────────────────────────────────────
_D = rf"(?:(?:{JOURS_PAT})\s+)?(\d+)(?:er|ème|eme|e)?\s+({MOIS_PAT})\s+(\d{{4}})"
# Birth
RE_BORN_FULL = re.compile(
rf"(?:voit le jour|est n[eé]e?|naît)\s+le\s+{_D}"
rf"(?:\s+[àa]\s+(.*?))?(?:\.|$)", re.I)
RE_BORN_APPROX = re.compile(
rf"(?:voit le jour|est n[eé]e?|né[e]?)\s+vers\s+(?:({MOIS_PAT})\s+)?(\d{{4}})", re.I)
RE_BORN_YEAR = re.compile(
rf"(?:né[e]?|voit le jour)\s+(?:en|vers)\s+(\d{{4}})", re.I)
RE_BAPTISM = re.compile(
rf"(?:est baptisé[e]?)\s+[àa]\s+(.*?),\s+le\s+{_D}", re.I)
RE_BORN_INLINE = re.compile(r"né[e]?\s+vers\s+(\d{4})", re.I) # "née vers 1699"
RE_BORN_EN = re.compile(r"né[e]?\s+en\s+(\d{4})", re.I)
# Death
RE_DEAD_FULL = re.compile(
rf"est décédé[e]?\s+le\s+{_D}(?:.*?[àa]\s+([\w\s'\(\),\-]+?))?(?:\.|$)", re.I)
RE_DEAD_BEF = re.compile(r"est décédé[e]?\s+avant\s+(.+?)(?:\.|$)", re.I)
RE_DEAD_AFT = re.compile(r"est décédé[e]?\s+après\s+(\d{4})", re.I)
RE_DEAD_YEAR = re.compile(r"est décédé[e]?\s+en\s+(\d{4})", re.I)
# RE_MEURT: only matches when NOT preceded by a relative clause in the same sentence
# (sentences starting with "Sa mère/son père/son époux meurt" are excluded in parse_death)
RE_MEURT = re.compile(
rf"meurt\s+le\s+{_D}", re.I)
_RE_RELATIVE_MEURT = re.compile(
r"\b(?:sa|son)\s+(?:père|mère|époux|épouse|mari|femme|frère|sœur)\b[^.]*meurt", re.I)
# Marriage
RE_MARR = re.compile(
rf"(?:se marient|mariage (?:civil|religieux|est célébré))[^\d]*le\s+{_D}"
rf"(?:\s+[àa]\s+(.*?))?(?:\.|$)", re.I)
RE_CONTRAT = re.compile(
rf"contrat de mariage\s+le\s+{_D}"
rf"(?:\s+[àa]\s+(.*?))?(?:\.|$)", re.I)
RE_SPOUSE = re.compile(
r"(?:Il|Elle)\s+s[''`]unit\s+avec\s+(.*?)(?:,|\()", re.I)
RE_SPOUSE_DATES = re.compile(r"\(([~\d]{4})-?([~\d]{4})?\)", re.I)
# Occupation
RE_OCCU_SERA = re.compile(r"\w+\s+sera\s+([^.]+)\.", re.I)
RE_OCCU_EST = re.compile(r"\w+\s+est\s+([a-zéàèù][a-zéàèù\-\s]+?)[\.,]", re.I)
# Person ID header — matches standalone IDs like "1", "1a", "1.2", "1.2b", "1.4.4b.1"
RE_PERSON_ID = re.compile(r"^(\d+[a-z]?(?:\.\d+[a-z]?)*)\s*$", re.I)
# Name line (bold): "Pierre FABRE voit le jour..."
RE_NAME_LINE = re.compile(
r"^([A-ZÀ-Ü][a-zà-ü\-]+(?:\s+[A-ZÀ-Ü][a-zà-ü\-]+)*" # first name(s)
r"\s+[A-ZÀÂÇÉÈÊËÎÏÔÛÙÜŸÆŒ][A-ZÀÂÇÉÈÊËÎÏÔÛÙÜŸÆŒ\s'\-]+?)" # SURNAME
r"\s+(?:voit le jour|est né[e]?|naît|né[e]?\s)", re.I
)
# Sex from prose
RE_FILS = re.compile(r"\b(Il|Elle)\s+est\s+(?:l[e']|le)\s*fils\b", re.I)
RE_FILLE = re.compile(r"\b(Il|Elle)\s+est\s+(?:l[ae']|la)\s*fille\b", re.I)
RE_FILS2 = re.compile(r"\bfils\s+(?:légitim|naturel)", re.I)
RE_FILLE2 = re.compile(r"\bfille\s+(?:légitim|naturell)", re.I)
def parse_sex(full_text: str) -> str:
"""Return 'M', 'F', or '' from prose clues."""
if RE_FILS.search(full_text) or RE_FILS2.search(full_text):
return "M"
if RE_FILLE.search(full_text) or RE_FILLE2.search(full_text):
return "F"
# Pronoun fallback
if re.search(r"\bIl\s+est\b", full_text):
return "M"
if re.search(r"\bElle\s+est\b", full_text):
return "F"
return ""
def parse_birth(full_text: str) -> dict:
"""Extract birth/baptism date and place."""
result = {"date": "", "plac": "", "type": "BIRT"}
m = RE_BORN_FULL.search(full_text)
if m:
result["date"] = fr_date(m.group(1), m.group(2), m.group(3))
result["plac"] = _clean_place(m.group(4) or "")
return result
m = RE_BAPTISM.search(full_text)
if m:
result["type"] = "BAPM"
result["plac"] = _clean_place(m.group(1) or "")
result["date"] = fr_date(m.group(2), m.group(3), m.group(4))
return result
m = RE_BORN_APPROX.search(full_text)
if m:
result["date"] = fr_date(None, m.group(1), m.group(2), "ABT")
return result
for pat in (RE_BORN_YEAR, RE_BORN_EN, RE_BORN_INLINE):
m = pat.search(full_text)
if m:
result["date"] = "ABT " + m.group(1)
return result
return result
# Matches "à l'âge de ..." to be skipped in death sentences
_AGE_CLAUSE = re.compile(r",\s*[àa]\s+l['']\âge\s+de\s+[^,]+", re.I)
# Matches final place: last ", à Place" before period
_DEAD_PLACE = re.compile(r",\s*[àa]\s+([A-ZÀ-Ü][^,.]+?(?:\([A-Za-zÀ-Ü\s\-]+\))?)\s*(?:\.|$)", re.I)
def _extract_death_place(sentence: str) -> str:
"""Extract place from a death sentence, skipping 'à l'âge de' clauses."""
# Remove age clause so we don't pick it up as a place
cleaned = _AGE_CLAUSE.sub("", sentence)
# Find last place mention
matches = list(_DEAD_PLACE.finditer(cleaned))
if matches:
return _clean_place(matches[-1].group(1))
return ""
def parse_death(full_text: str) -> dict:
"""Extract death date and place."""
result = {"date": "", "plac": ""}
m = RE_DEAD_FULL.search(full_text)
if m:
result["date"] = fr_date(m.group(1), m.group(2), m.group(3))
# Extract place from the full death sentence separately
# Find the sentence that contains the match
sent_start = full_text.rfind("est décédé", 0, m.end())
if sent_start == -1:
sent_start = m.start()
sentence = full_text[sent_start:full_text.find(".", m.end()) + 1]
result["plac"] = _extract_death_place(sentence)
return result
# RE_MEURT: only when the sentence is about the main person, not a relative
m = RE_MEURT.search(full_text)
if m:
# Check the sentence containing this match
sent_start = full_text.rfind(".", 0, m.start())
sentence = full_text[sent_start + 1: full_text.find(".", m.end()) + 1]
if not _RE_RELATIVE_MEURT.search(sentence):
result["date"] = fr_date(m.group(1), m.group(2), m.group(3))
return result
m = RE_DEAD_BEF.search(full_text)
if m:
raw = m.group(1).strip().split(",")[0].rstrip(".")
result["date"] = "BEF " + raw
return result
m = RE_DEAD_AFT.search(full_text)
if m:
result["date"] = "AFT " + m.group(1)
return result
m = RE_DEAD_YEAR.search(full_text)
if m:
result["date"] = m.group(1)
return result
return result
_RE_NO_CHILDREN = re.compile(r"pas\s+d[''e]enfants|il\s+n[''y]\s+a\s+pas", re.I)
_RE_HAS_CHILDREN = re.compile(
r"(?:aura|a\s+eu|avez?|ont)\s+\w+\s+enfants?|(?:ce\s+couple|ils)\s+aura", re.I)
def _parse_one_marriage(segment: str) -> dict:
"""Parse spouse + date + place from a single 's'unit avec …' segment."""
result = {"date": "", "plac": "", "spouse": "",
"spouse_birth": "", "spouse_death": "", "spouse_occu": "",
"source": {},
"has_children_text": False}
spouse_info = _parse_spouse_context(segment)
if spouse_info.get("name"):
result["spouse"] = spouse_info["name"]
result["spouse_birth"] = spouse_info.get("birth", "")
result["spouse_death"] = spouse_info.get("death", "")
result["spouse_occu"] = spouse_info.get("occu", "")
else:
m = RE_SPOUSE.search(segment)
if m:
result["spouse"] = re.sub(r"\s*\(.*?\)", "", m.group(1).strip()).strip()
for pat in (RE_MARR, RE_CONTRAT):
m = pat.search(segment)
if m:
result["date"] = fr_date(m.group(1), m.group(2), m.group(3))
result["plac"] = _clean_place(m.group(4) or "")
break
# Detect inline children mention in this segment
result["has_children_text"] = (
bool(_RE_HAS_CHILDREN.search(segment))
and not bool(_RE_NO_CHILDREN.search(segment))
)
return result
def parse_marriages(full_text: str, italic_texts: list[str] | None = None) -> list[dict]:
"""
Return list of marriage dicts, one per union found in full_text.
Each dict: {spouse, spouse_birth, spouse_death, spouse_occu,
date, plac, source, has_children_text}.
"""
splits = [m.start() for m in re.finditer(r"\bs[''`]unit\s+avec\b", full_text, re.I)]
if not splits:
return []
marriages = []
for i, start in enumerate(splits):
end = splits[i + 1] if i + 1 < len(splits) else len(full_text)
seg = full_text[start:end]
marriages.append(_parse_one_marriage(seg))
# Assign contract sources from italic paragraphs to the best-matching marriage
for it in (italic_texts or []):
src = _parse_contract_source(it)
if not src:
continue
# Prefer marriage with a date; fall back to last
target = next((m for m in reversed(marriages) if m["date"]), marriages[-1])
if not target["source"]:
target["source"] = src
return marriages
def parse_occupation(full_text: str) -> str:
m = RE_OCCU_SERA.search(full_text)
if m:
return m.group(1).strip().rstrip(".")
return ""
def _clean_place(raw: str) -> str:
"""Normalise a place string extracted from HTML text."""
if not raw:
return ""
# Strip trailing punctuation (keep closing paren if place has department in parens)
p = raw.strip().rstrip(".,;(")
p = re.sub(r"\s+", " ", p).strip()
# Trim at known sentence-ending words
p = re.split(r"\s+(?:Il|Elle|Ce|Ils|Leur|Le|La|Les|Un|Une|Son|Sa)\b", p, maxsplit=1)[0]
return p[:80] # GEDCOM line limit
# ── HTML / paragraph parsing ──────────────────────────────────────────────────
def extract_paragraphs(html_body: str) -> list[dict]:
"""
Parse HTML body into a list of paragraph dicts:
{text, color, is_bold, bold_text}
Colors: black, navy, red, gray (from inline CSS or <font> color=).
"""
soup = BeautifulSoup(html_body, "html.parser")
def tag_color(tag) -> str:
style = tag.get("style", "")
m = re.search(r"color:\s*(\w+)", style)
if m:
return m.group(1).lower()
color_attr = tag.get("color", "")
if color_attr:
named = {"#000000": "black", "#000080": "navy", "navy": "navy",
"red": "red", "gray": "gray", "grey": "gray"}
return named.get(color_attr.lower(), color_attr.lower())
return ""
# Collect all block elements in document order:
# - all <p> tags
# - leaf <div> tags (no nested div children) — some families use divs instead of p
block_tags = [
tag for tag in soup.find_all(["p", "div"])
if tag.name == "p" or not tag.find("div")
]
paragraphs = []
for p in block_tags:
# Determine dominant color (first explicit color found)
color = "black"
for tag in p.descendants:
if hasattr(tag, "get"):
c = tag_color(tag)
if c:
color = c
break
# Bold detection — <b> or <strong>
bold_spans = p.find_all(["b", "strong"])
bold_text = " ".join(b.get_text(" ", strip=True) for b in bold_spans).strip()
is_bold = bool(bold_text)
# Full text
full_text = p.get_text(" ", strip=True).replace("\xa0", " ").strip()
full_text = re.sub(r"\s+", " ", full_text)
# Italic detection — whole paragraph is italic when all visible text is in <i>/<em>
italic_spans = p.find_all(["i", "em"])
is_italic = bool(italic_spans) and not is_bold
if full_text:
paragraphs.append({
"text": full_text,
"color": color,
"is_bold": is_bold,
"bold_text": re.sub(r"\s+", " ", bold_text),
"is_italic": is_italic,
})
return paragraphs
def split_into_person_blocks(paragraphs: list[dict]) -> list[dict]:
"""
Split paragraph list into person blocks using the bold ID pattern.
Returns list of {id, name_line, paras}.
Generation-1 pages have no explicit ID line; we assign id="1".
"""
blocks = []
current = None
for para in paragraphs:
text = para["text"]
bold_text = para["bold_text"]
# ── Is this a standalone person-ID line? ──
# Criterion: bold, black, and the ENTIRE text (stripped) is a valid ID
if para["is_bold"] and para["color"] in ("black", ""):
candidate = re.sub(r"[\s\xa0]+", "", bold_text)
full_stripped = re.sub(r"[\s\xa0]+", "", text)
if RE_PERSON_ID.match(candidate) and RE_PERSON_ID.match(full_stripped):
if current:
blocks.append(current)
current = {"id": candidate, "name_line": "", "paras": []}
continue
# ── Is this a name+birth line? (bold start, no id yet ──
if para["is_bold"] and current is not None and not current["name_line"]:
current["name_line"] = text
current["paras"].append(para)
continue
# ── Generation-1 edge case: first bold non-id paragraph ──
if para["is_bold"] and current is None:
# Likely the gen-1 title paragraph, skip
# But if it looks like a name+birth, create implicit id="0" (root ancestor)
if RE_NAME_LINE.match(bold_text) or RE_NAME_LINE.match(text):
current = {"id": "0", "name_line": text, "paras": [para]}
continue
if current is not None:
current["paras"].append(para)
if current:
blocks.append(current)
return blocks
def parse_block(block: dict, family_name: str) -> dict:
"""
Convert a person block into a structured person dict.
"""
person_id = block["id"]
name_line = block["name_line"]
all_paras = block["paras"]
full_text = " ".join(p["text"] for p in all_paras)
gray_notes = [p["text"] for p in all_paras if p["color"] == "gray"]
# ── Name ──
# Prefer the bold text of the first paragraph (reliable) over regex extraction
bold0 = (all_paras[0]["bold_text"] if all_paras else "").strip()
if bold0 and not RE_PERSON_ID.match(re.sub(r"[\s\xa0]+", "", bold0)):
name = bold0
elif name_line:
# Fallback: extract name portion before birth keyword
name = re.split(r"\s+(?:voit le jour|est né[e]?|naît|né[e]?\s)", name_line, maxsplit=1)[0].strip()
else:
name = ""
# Split name into given/surname: surname is the ALL-CAPS part
given, surname = _split_name(name)
# ── Sex — use only the intro sentence (before spouse description) ──
# Searching the whole block picks up spouse/child "fils légitime" etc.
intro_end = re.search(r"\bs[''`]unit\s+avec\b", full_text)
sex_context = full_text[: intro_end.start()] if intro_end else full_text[:500]
sex = parse_sex(sex_context)
# Secondary: "est née" / "est né" in the name line
if not sex:
if re.search(r"\best\s+née\b", full_text[:300], re.I):
sex = "F"
elif re.search(r"\best\s+né\b(?!e)", full_text[:300], re.I):
sex = "M"
# Fallback from name color in later text
if not sex and all_paras:
for p in all_paras:
if p["color"] == "navy":
sex = "M"; break
if p["color"] == "red":
sex = "F"; break
# ── Birth / Baptism ──
birth = parse_birth(full_text)
# ── Death ──
death = parse_death(full_text)
# ── Marriages (may be plural) ──
italic_texts = [p["text"] for p in all_paras if p.get("is_italic")]
marriages = parse_marriages(full_text, italic_texts)
# ── Occupation ──
occu = parse_occupation(full_text)
# ── Gray paragraph groups (witnesses, godparents, general notes) ──
grouped = _group_gray_notes(all_paras)
# Attach marriage notes to the first marriage; create minimal entry if needed
if grouped["marriage_notes"]:
if marriages:
marriages[0].setdefault("notes", []).extend(grouped["marriage_notes"])
else:
marriages = [{
"date": "", "plac": "", "spouse": "",
"spouse_birth": "", "spouse_death": "", "spouse_occu": "",
"source": {}, "has_children_text": False,
"notes": list(grouped["marriage_notes"]),
}]
# ── Children listed inline ──
children_inline = _extract_children_inline(full_text)
return {
"id": person_id,
"family": family_name,
"name": name,
"given": given,
"surname": surname,
"sex": sex,
"birth": birth,
"death": death,
"marriages": marriages,
"occupation": occu,
"birth_notes": grouped["birth_notes"],
"death_notes": grouped["death_notes"],
"general_notes": grouped["general_notes"],
"children_inline": children_inline,
"full_text": full_text,
}
def _split_name(name: str) -> tuple[str, str]:
"""Split 'Pierre FABRE' → ('Pierre', 'FABRE')."""
# Surname = longest contiguous run of uppercase tokens at the END
tokens = name.split()
surname_tokens = []
given_tokens = []
i = len(tokens) - 1
while i >= 0 and (tokens[i].upper() == tokens[i] or tokens[i] in ("de", "d'", "du", "des", "l'", "la", "le")):
surname_tokens.insert(0, tokens[i])
i -= 1
given_tokens = tokens[:i+1]
return " ".join(given_tokens), " ".join(surname_tokens)
def _extract_children_inline(text: str) -> list[str]:
"""
Extract names of children listed as '- Name né(e) en YEAR'
Returns list of first-name strings.
"""
children = []
for m in re.finditer(r"-\s+([A-ZÀ-Ü][a-zà-üA-ZÀ-Ü\s]+?)\s+né", text):
children.append(m.group(1).strip())
return children
# ── SQL extraction ────────────────────────────────────────────────────────────
def parse_sql_values(line: str) -> list:
s = line.strip()
if s.startswith("("): s = s[1:]
for sfx in (");", "),", ")"):
if s.endswith(sfx):
s = s[:-len(sfx)]; break
ESC = {"n":"\n","r":"\r","t":"\t","b":"\x08",
"\\":"\\"," '":"'",'"':'"',"0":"\x00","Z":"\x1a"}
values, i, n = [], 0, len(s)
while i < n:
while i < n and s[i] in " \t": i += 1
if i >= n: break
if s[i:i+4] == "NULL":
values.append(None); i += 4
elif s[i] == "'":
i += 1; buf = []
while i < n:
c = s[i]
if c == "\\" and i+1 < n:
buf.append(ESC.get(s[i+1], s[i+1])); i += 2
elif c == "'": i += 1; break
else: buf.append(c); i += 1
values.append("".join(buf))
else:
j = i
while j < n and s[j] != ",": j += 1
values.append(s[i:j].strip()); i = j
while i < n and s[i] in " \t,": i += 1
return values
def stream_filiation_nodes(sql_file: Path) -> dict[int, dict]:
"""
One-pass stream: collect title+type from drupal_node,
body from drupal_node_revisions for all 'book' type nodes
whose title contains 'filiation'.
Returns {nid: {title, body}}.
"""
node_cols = []
rev_cols = []
nodes: dict[int, dict] = {}
current_table = None
TARGETS = {"drupal_node", "drupal_node_revisions"}
INSERT_RE = re.compile(r"INSERT INTO `([^`]+)` \((.+)\) VALUES", re.I)
with open(sql_file, encoding="utf-8", errors="replace") as fh:
for line in fh:
ls = line.rstrip("\r\n")
m = INSERT_RE.match(ls)
if m:
tname = m.group(1)
current_table = tname if tname in TARGETS else None
if tname == "drupal_node":
node_cols = [c.strip().strip("`") for c in m.group(2).split(",")]
elif tname == "drupal_node_revisions":
rev_cols = [c.strip().strip("`") for c in m.group(2).split(",")]
continue
if current_table is None:
continue
stripped = ls.strip()
if not stripped.startswith("("):
if stripped.endswith(";"):
current_table = None
continue
row = parse_sql_values(stripped)
if current_table == "drupal_node" and node_cols:
d = dict(zip(node_cols, row))
if d.get("type") == "book":
title = d.get("title", "")
if "filiation" in title.lower() or "filiations" in title.lower():
try:
nid = int(d["nid"])
except (ValueError, TypeError):
pass
else:
nodes[nid] = {"title": title, "body": ""}
elif current_table == "drupal_node_revisions" and rev_cols:
d = dict(zip(rev_cols, row))
try:
nid = int(d["nid"])
except (ValueError, TypeError):
continue
if nid in nodes:
nodes[nid]["body"] = d.get("body") or ""
if stripped.endswith(";"):
current_table = None
return nodes
# ── Family grouping ───────────────────────────────────────────────────────────
def family_name_from_title(title: str) -> str:
"""'Les filiations FABRE : Génération 3''FABRE'"""
t = title.replace("Les filiations", "").strip()
# Remove suffix starting at ' :'
t = t.split(":")[0].strip()
# Remove leading d', de , d'
t = re.sub(r"^(?:d[''e]\s*|de\s+|du\s+|des\s+|l['']\s*)", "", t, flags=re.I)
return t.strip()
def generation_number(title: str) -> int:
"""Extract generation number; 0 for root/présentation pages."""
m = re.search(r"[Gg]én[eé]ration\s+(\d+)", title)
return int(m.group(1)) if m else 0
def group_by_family(nodes: dict[int, dict]) -> dict[str, list[dict]]:
"""
Return {family_name: [sorted list of {nid, title, body, gen}]}
Only includes generation pages (gen > 0) and the root page (gen == 0
when not 'présentation').
"""
families: dict[str, list] = {}
for nid, info in nodes.items():
title = info["title"]
fname = family_name_from_title(title)
gen = generation_number(title)
if "présentation" in title.lower():
continue # skip intro pages
if not fname:
continue
families.setdefault(fname, []).append(
{"nid": nid, "title": title, "body": info["body"], "gen": gen}
)
for fname in families:
families[fname].sort(key=lambda x: (x["gen"] == 0, x["gen"]))
return families
# ── Cross-page person assembly ────────────────────────────────────────────────
def parent_id(person_id: str) -> str | None:
"""
Given a person ID like '1.4.4b.1', return the parent's ID '1.4.4b'.
Returns None for root.
"""
parts = person_id.rsplit(".", 1)
if len(parts) == 1:
return None
return parts[0] if parts[0] else None
def child_union_letter(person_id: str) -> str:
"""
Return the union letter from the last component of a person ID.
'5.1.7.1a.5b.3a.7.1a''a' (child 1 of union "a" of parent)
'5.1.7.1a.5b.3a.7.2''' (no explicit union letter)
"""
last = person_id.rsplit(".", 1)[-1]
m = re.match(r"^\d+([a-z]*)$", last, re.I)
return m.group(1).lower() if m else ""
# ── GEDCOM generation ─────────────────────────────────────────────────────────
_indi_counter = 0
_fam_counter = 0
_sour_counter = 0
_repo_counter = 0
# Registries reset per file
_sour_registry: dict[tuple, str] = {} # (title_lc, depot_lc, caln_lc) → xref
_repo_registry: dict[str, str] = {} # depot_lc → xref
_sour_records: dict[str, dict] = {} # xref → {title, depot_xref, caln}
_repo_records: dict[str, str] = {} # xref → name
def new_indi() -> str:
global _indi_counter
_indi_counter += 1
return f"@I{_indi_counter:04d}@"
def new_fam() -> str:
global _fam_counter
_fam_counter += 1
return f"@F{_fam_counter:04d}@"
def new_sour() -> str:
global _sour_counter
_sour_counter += 1
return f"@S{_sour_counter:04d}@"
def new_repo() -> str:
global _repo_counter
_repo_counter += 1
return f"@R{_repo_counter:04d}@"
def _get_or_create_repo(depot: str) -> str:
key = depot.strip().lower()
if key in _repo_registry:
return _repo_registry[key]
rx = new_repo()
_repo_registry[key] = rx
_repo_records[rx] = depot.strip()
return rx
def _get_or_create_sour(title: str, depot: str, caln: str) -> str:
key = (title.strip().lower(), depot.strip().lower(), caln.strip().lower())
if key in _sour_registry:
return _sour_registry[key]
rx = new_sour()
_sour_registry[key] = rx
repo_xref = _get_or_create_repo(depot) if depot else ""
_sour_records[rx] = {"title": title.strip(), "repo_xref": repo_xref, "caln": caln.strip()}
return rx
def gedcom_line(level: int, tag: str, value: str = "") -> str:
line = f"{level} {tag}"
if value:
line += f" {value}"
return line
def person_to_gedcom(person: dict, indi_ref: str,
famc: list[str], fams: list[str]) -> list[str]:
"""Build GEDCOM INDI record lines for one person."""
lines = [gedcom_line(0, indi_ref, "INDI")]
# Name
given = person.get("given", "")
surname = person.get("surname", "")
full = f"{given} /{surname}/" if surname else given
if full:
lines.append(gedcom_line(1, "NAME", full))
if given:
lines.append(gedcom_line(2, "GIVN", given))
if surname:
lines.append(gedcom_line(2, "SURN", surname))
# Sex
sex = person.get("sex", "")
if sex:
lines.append(gedcom_line(1, "SEX", sex))
# Birth / Baptism
birth = person.get("birth", {})
birth_has_data = birth.get("date") or birth.get("plac")
birth_notes = person.get("birth_notes", [])
if birth_has_data or birth_notes:
event_tag = birth.get("type", "BIRT")
lines.append(gedcom_line(1, event_tag))
if birth_has_data:
_t, _raw = _split_place(birth.get("plac", ""))
if birth.get("date"):
lines.append(gedcom_line(2, "DATE", birth["date"]))
if _t:
lines.append(gedcom_line(3, "TIME", _t))
elif _t:
lines.append(gedcom_line(2, "TIME", _t))
_p = _expand_place(_raw)
if _p:
lines.append(gedcom_line(2, "PLAC", _p))
lines.extend(_emit_grouped_notes(birth_notes, base_level=2))
# Death
death = person.get("death", {})
death_has_data = death.get("date") or death.get("plac")
death_notes = person.get("death_notes", [])
if death_has_data or death_notes:
lines.append(gedcom_line(1, "DEAT"))
if death_has_data:
_t, _raw = _split_place(death.get("plac", ""))
if death.get("date"):
lines.append(gedcom_line(2, "DATE", death["date"]))
if _t:
lines.append(gedcom_line(3, "TIME", _t))
elif _t:
lines.append(gedcom_line(2, "TIME", _t))
_p = _expand_place(_raw)
if _p:
lines.append(gedcom_line(2, "PLAC", _p))
lines.extend(_emit_grouped_notes(death_notes, base_level=2))
# Occupation
occu = person.get("occupation", "")
if occu:
lines.append(gedcom_line(1, "OCCU", occu))
# Family links
for fc in famc:
lines.append(gedcom_line(1, "FAMC", fc))
for fs in fams:
lines.append(gedcom_line(1, "FAMS", fs))
# General notes (INDI level)
lines.extend(_emit_grouped_notes(person.get("general_notes", []), base_level=1))
return lines
def build_gedcom_for_family(family_name: str,
pages: list[dict]) -> list[str]:
"""
Parse all generation pages for a family, build persons dict,
resolve links, and emit GEDCOM lines.
"""
global _indi_counter, _fam_counter
# ── Step 1: parse all pages into a flat persons dict ──
persons_by_id: dict[str, dict] = {} # person_id → person data
for page in pages:
body = page["body"]
if not body.strip():
continue
paras = extract_paragraphs(body)
blocks = split_into_person_blocks(paras)
for block in blocks:
p = parse_block(block, family_name)
pid = p["id"]
if pid in persons_by_id:
# Merge: later pages may have more detail
existing = persons_by_id[pid]
for field in ("birth", "death", "occupation"):
if not existing.get(field) and p.get(field):
existing[field] = p[field]
for notes_field in ("birth_notes", "death_notes", "general_notes"):
existing.setdefault(notes_field, []).extend(p.get(notes_field, []))
# Merge marriages: append new spouses not already known
if p.get("marriages"):
ex_spouses = {m["spouse"].lower() for m in existing.get("marriages", [])}
for nm in p["marriages"]:
if nm["spouse"].lower() not in ex_spouses:
existing.setdefault("marriages", []).append(nm)
ex_spouses.add(nm["spouse"].lower())
# Update sex if missing
if not existing.get("sex") and p.get("sex"):
existing["sex"] = p["sex"]
else:
persons_by_id[pid] = p
if not persons_by_id:
return []
# ── Step 2: assign INDI xrefs ──
xref: dict[str, str] = {}
for pid in sorted(persons_by_id.keys()):
xref[pid] = new_indi()
# ── Step 3: resolve parent→child links and union letters ──
# "0" is the implicit root (gen-1 ancestor)
has_root = "0" in persons_by_id
# For each child: which union letter do they belong to?
# child_ul[pid] = "" | "a" | "b" | ...
# parent_union_letters[parent_pid] = sorted set of union letters seen in children
child_ul: dict[str, str] = {}
parent_union_letters: dict[str, list[str]] = {}
for pid in persons_by_id:
if pid == "0":
continue
par = parent_id(pid)
if par is None and has_root:
par = "0"
if par and par in persons_by_id:
ul = child_union_letter(pid)
child_ul[pid] = ul
ls = parent_union_letters.setdefault(par, [])
if ul not in ls:
ls.append(ul)
for ls in parent_union_letters.values():
ls.sort()
# fam_key = parent_pid + "#" + union_letter (or parent_pid if only 1 marriage)
# We determine fam_keys from marriages list AND from children's actual union letters.
#
# Strategy: for each parent:
# - union letters from children tell us which unions produced descendants
# - text marriages list tells us all unions (including childless ones)
# - We match: marriages marked has_children_text → union letters (in order)
# remaining marriages → synthetic childless keys
def fam_keys_for_parent(par_pid: str) -> list[str]:
"""
Return ordered list of fam_keys for this parent's marriages.
One fam_key per marriage in text order.
"""
marriages = persons_by_id[par_pid].get("marriages", [])
if not marriages:
return []
if len(marriages) == 1:
# Single marriage: use letters from children, or bare parent_pid
uls = parent_union_letters.get(par_pid, [""])
return [f"{par_pid}#{uls[0]}" if uls else par_pid]
# Multiple marriages: split into "with-children" and "childless" groups
# using the text hint, then map union letters
union_letters = sorted(parent_union_letters.get(par_pid, []))
with_children = [m for m in marriages if m.get("has_children_text")]
without_children = [m for m in marriages if not m.get("has_children_text")]
# Fallback: if text detection failed, assume last marriage has children
if not with_children and union_letters:
with_children = [marriages[-1]]
without_children = marriages[:-1]
keys = []
ul_iter = iter(union_letters)
childless_idx = [0]
for m in marriages:
if m in with_children:
ul = next(ul_iter, f"_ul{len(keys)}")
keys.append(f"{par_pid}#{ul}")
else:
keys.append(f"{par_pid}#childless{childless_idx[0]}")
childless_idx[0] += 1
return keys
# Build fam_xrefs: fam_key → GEDCOM @Fxxxx@ xref
fam_xrefs: dict[str, str] = {}
# Build famc map: child_pid → fam_key (which FAM this child belongs to)
famc_fam: dict[str, str] = {}
for pid in persons_by_id:
if pid == "0":
continue
par = parent_id(pid)
if par is None and has_root:
par = "0"
if not (par and par in persons_by_id):
continue
# Determine which fam_key this child belongs to
ul = child_ul.get(pid, "")
par_marriages = persons_by_id[par].get("marriages", [])
par_keys = fam_keys_for_parent(par)
# Create fam_xrefs for all marriages of parent if not yet done
for fk in par_keys:
if fk not in fam_xrefs:
fam_xrefs[fk] = new_fam()
# Match child to correct fam_key by union letter
# fam_key format: "parent#ul" or "parent" (single marriage)
matched_key = None
if par_keys:
# Try to find the key that contains this union letter
for fk in par_keys:
suffix = fk.split("#", 1)[1] if "#" in fk else ""
if suffix == ul or (not suffix and not ul):
matched_key = fk
break
if matched_key is None:
matched_key = par_keys[0] # fallback
if matched_key:
famc_fam[pid] = matched_key
# Also ensure FAM records exist for parents who only have marriages (no children in tree)
for par_pid, person in persons_by_id.items():
if not person.get("marriages"):
continue
par_keys = fam_keys_for_parent(par_pid)
for fk in par_keys:
if fk not in fam_xrefs:
fam_xrefs[fk] = new_fam()
# Build reverse: fam_key → list of child_pids
fam_children: dict[str, list[str]] = {}
for child_pid, fk in famc_fam.items():
fam_children.setdefault(fk, []).append(child_pid)
# person_famc: child_pid → @Fxxxx@ xref
person_famc: dict[str, str] = {
pid: fam_xrefs[fk] for pid, fk in famc_fam.items() if fk in fam_xrefs
}
# person_fams: parent_pid → list of @Fxxxx@ xrefs (one per marriage)
person_fams: dict[str, list[str]] = {}
for fk, fr in fam_xrefs.items():
par_pid = fk.split("#")[0]
person_fams.setdefault(par_pid, []).append(fr)
# ── Step 4: emit GEDCOM ──
lines = []
# spouse_data: name_lc → {xref, sex, birth, death, occu, fams}
spouse_data: dict[str, dict] = {}
def get_or_create_spouse(name: str, sex: str) -> str:
key = name.strip().lower()
if key not in spouse_data:
sx = new_indi()
spouse_data[key] = {"xref": sx, "name": name, "sex": sex,
"birth": "", "death": "", "occu": "", "fams": []}
return spouse_data[key]["xref"]
# INDI records for known persons
for pid, person in sorted(persons_by_id.items()):
indi_ref = xref[pid]
famc_list = [person_famc[pid]] if pid in person_famc else []
fams_list = person_fams.get(pid, [])
lines += person_to_gedcom(person, indi_ref, famc_list, fams_list)
# FAM records — one per fam_key
for fam_key, fam_ref in fam_xrefs.items():
par_pid = fam_key.split("#")[0]
parent = persons_by_id.get(par_pid, {})
par_sex = parent.get("sex", "")
par_xref = xref.get(par_pid, "")
# Identify which marriage this FAM corresponds to
par_keys = fam_keys_for_parent(par_pid)
try:
marr_idx = par_keys.index(fam_key)
except ValueError:
marr_idx = 0
marriages = parent.get("marriages", [])
marr = marriages[marr_idx] if marr_idx < len(marriages) else {}
lines.append(gedcom_line(0, fam_ref, "FAM"))
# Parent as HUSB or WIFE
if par_sex == "F":
lines.append(gedcom_line(1, "WIFE", par_xref))
else:
lines.append(gedcom_line(1, "HUSB", par_xref))
# Spouse
spouse_name = marr.get("spouse", "")
if spouse_name:
skey = spouse_name.strip().lower()
spouse_sex = "F" if par_sex == "M" else "M"
get_or_create_spouse(spouse_name, spouse_sex)
sd = spouse_data[skey]
if not sd["birth"] and marr.get("spouse_birth"):
sd["birth"] = marr["spouse_birth"]
if not sd["death"] and marr.get("spouse_death"):
sd["death"] = marr["spouse_death"]
if not sd["occu"] and marr.get("spouse_occu"):
sd["occu"] = marr["spouse_occu"]
sd["fams"].append(fam_ref)
spouse_xref = sd["xref"]
if par_sex == "M":
lines.append(gedcom_line(1, "WIFE", spouse_xref))
else:
lines.append(gedcom_line(1, "HUSB", spouse_xref))
# Marriage event
marr_notes = marr.get("notes", [])
if marr.get("date") or marr.get("plac") or marr_notes:
lines.append(gedcom_line(1, "MARR"))
_t, _raw = _split_place(marr.get("plac", ""))
if marr.get("date"):
lines.append(gedcom_line(2, "DATE", marr["date"]))
if _t:
lines.append(gedcom_line(3, "TIME", _t))
elif _t:
lines.append(gedcom_line(2, "TIME", _t))
_p = _expand_place(_raw)
if _p:
lines.append(gedcom_line(2, "PLAC", _p))
src = marr.get("source", {})
if src.get("title"):
sour_xref = _get_or_create_sour(
src["title"], src.get("depot", ""), src.get("caln", ""))
lines.append(gedcom_line(2, "SOUR", sour_xref))
if src.get("page"):
lines.append(gedcom_line(3, "PAGE", src["page"]))
lines.extend(_emit_grouped_notes(marr_notes, base_level=2))
# Children belonging to this FAM
for child_pid in sorted(fam_children.get(fam_key, [])):
child_xref = xref.get(child_pid, "")
if child_xref:
lines.append(gedcom_line(1, "CHIL", child_xref))
# Spouse INDI records — emitted AFTER FAM loop so spouse_data is complete
for sd in spouse_data.values():
sx = sd["xref"]
s_name = sd["name"]
given, surname = _split_name(s_name)
# Preserve original capitalisation for the given name; surname stays upper
if not given:
given, surname = _split_name(s_name.title())
full = f"{given} /{surname}/" if surname else (given or s_name)
lines.append(gedcom_line(0, sx, "INDI"))
lines.append(gedcom_line(1, "NAME", full))
if given:
lines.append(gedcom_line(2, "GIVN", given))
if surname:
lines.append(gedcom_line(2, "SURN", surname))
if sd["sex"]:
lines.append(gedcom_line(1, "SEX", sd["sex"]))
if sd["birth"]:
lines.append(gedcom_line(1, "BIRT"))
lines.append(gedcom_line(2, "DATE", sd["birth"]))
if sd["death"]:
lines.append(gedcom_line(1, "DEAT"))
lines.append(gedcom_line(2, "DATE", sd["death"]))
if sd["occu"]:
lines.append(gedcom_line(1, "OCCU", sd["occu"]))
for fref in sd["fams"]:
lines.append(gedcom_line(1, "FAMS", fref))
# REPO records
for rx, rname in _repo_records.items():
lines.append(gedcom_line(0, rx, "REPO"))
lines.append(gedcom_line(1, "NAME", rname))
# SOUR records
for sx, srec in _sour_records.items():
lines.append(gedcom_line(0, sx, "SOUR"))
lines.append(gedcom_line(1, "TITL", srec["title"]))
if srec["repo_xref"]:
lines.append(gedcom_line(1, "REPO", srec["repo_xref"]))
if srec["caln"]:
lines.append(gedcom_line(2, "CALN", srec["caln"]))
return lines
def build_gedcom_file(family_name: str, pages: list[dict]) -> str:
"""Return complete GEDCOM file content for one family."""
global _indi_counter, _fam_counter, _sour_counter, _repo_counter
global _sour_registry, _repo_registry, _sour_records, _repo_records
# Reset counters and registries per file
_indi_counter = 0
_fam_counter = 0
_sour_counter = 0
_repo_counter = 0
_sour_registry = {}
_repo_registry = {}
_sour_records = {}
_repo_records = {}
body_lines = build_gedcom_for_family(family_name, pages)
header = [
"0 HEAD",
"1 SOUR BaseCGL",
f"2 NAME {SOURCE_STR}",
"1 GEDC",
"2 VERS 5.5.1",
"2 FORM LINEAGE-LINKED",
"1 CHAR UTF-8",
f"1 NOTE Filiations {family_name} export automatique depuis les pages Drupal",
]
trailer = ["0 TRLR"]
return "\n".join(header + body_lines + trailer) + "\n"
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
OUT_DIR.mkdir(exist_ok=True)
print(f"Streaming {SQL_FILE}", flush=True)
nodes = stream_filiation_nodes(SQL_FILE)
print(f" Found {len(nodes)} filiation book pages")
families = group_by_family(nodes)
print(f" Found {len(families)} families: {', '.join(sorted(families))}")
total_persons = 0
for fname in sorted(families):
pages = families[fname]
gen_pages = [p for p in pages if p["gen"] > 0]
root_pages = [p for p in pages if p["gen"] == 0]
all_pages = root_pages + gen_pages
print(f"\n{fname}: {len(all_pages)} pages ({len(gen_pages)} generations)")
gedcom = build_gedcom_file(fname, all_pages)
out_path = OUT_DIR / f"filiations_{fname.replace(' ', '_').replace(chr(39), '')}.ged"
out_path.write_text(gedcom, encoding="utf-8")
# Count INDI / FAM records by their level-0 xref prefix
n_indi = gedcom.count("0 @I")
n_fam = gedcom.count("0 @F")
total_persons += n_indi
print(f"{out_path.name} ({n_indi} INDI, {n_fam} FAM, {len(gedcom):,} bytes)")
print(f"\nDone. Total individuals across all families: {total_persons}")
print(f"Output directory: {OUT_DIR}")
if __name__ == "__main__":
main()