#!/usr/bin/env python3 """ Drupal 6 'lignées familiales' → GEDCOM 5.5.1 exporter. Reads basesgen.sql (drupal_node + drupal_node_revisions, type='book') and writes one .ged file per family under ./gedcom_output/. Persons are identified by their hierarchical ID (e.g. "1.4.4b.1"): - dots separate parent→child relationships - a letter suffix on the last component (a/b/c…) identifies which union of the parent produced this child Usage: python3 export_lignees_to_gedcom.py """ import re import sys import html from pathlib import Path from bs4 import BeautifulSoup # ── Configuration ───────────────────────────────────────────────────────────── SQL_FILE = Path("/home/yann64/BaseCGL/basesgen.sql") OUT_DIR = Path("/home/yann64/BaseCGL/gedcom_output") SOURCE_STR = "CGL Bases généalogiques du Languedoc – basesgen.sql" # ── French place hierarchy ──────────────────────────────────────────────────── # Maps département name → région name (post-2016 reform, mainland + DOM). # Keys are title-cased; lookup is case-insensitive (see _expand_place). _DEPT_TO_REGION: dict[str, str] = { # Auvergne-Rhône-Alpes "Ain": "Auvergne-Rhône-Alpes", "Allier": "Auvergne-Rhône-Alpes", "Ardèche": "Auvergne-Rhône-Alpes", "Cantal": "Auvergne-Rhône-Alpes", "Drôme": "Auvergne-Rhône-Alpes", "Isère": "Auvergne-Rhône-Alpes", "Loire": "Auvergne-Rhône-Alpes", "Haute-Loire": "Auvergne-Rhône-Alpes", "Puy-de-Dôme": "Auvergne-Rhône-Alpes", "Rhône": "Auvergne-Rhône-Alpes", "Savoie": "Auvergne-Rhône-Alpes", "Haute-Savoie": "Auvergne-Rhône-Alpes", # Bourgogne-Franche-Comté "Côte-d'Or": "Bourgogne-Franche-Comté", "Doubs": "Bourgogne-Franche-Comté", "Jura": "Bourgogne-Franche-Comté", "Nièvre": "Bourgogne-Franche-Comté", "Haute-Saône": "Bourgogne-Franche-Comté", "Saône-et-Loire": "Bourgogne-Franche-Comté", "Yonne": "Bourgogne-Franche-Comté", "Territoire de Belfort": "Bourgogne-Franche-Comté", # Bretagne "Côtes-d'Armor": "Bretagne", "Finistère": "Bretagne", "Ille-et-Vilaine": "Bretagne", "Morbihan": "Bretagne", # Centre-Val de Loire "Cher": "Centre-Val de Loire", "Eure-et-Loir": "Centre-Val de Loire", "Indre": "Centre-Val de Loire", "Indre-et-Loire": "Centre-Val de Loire", "Loir-et-Cher": "Centre-Val de Loire", "Loiret": "Centre-Val de Loire", # Corse "Corse-du-Sud": "Corse", "Haute-Corse": "Corse", # Grand Est "Ardennes": "Grand Est", "Aube": "Grand Est", "Marne": "Grand Est", "Haute-Marne": "Grand Est", "Meurthe-et-Moselle": "Grand Est", "Meuse": "Grand Est", "Moselle": "Grand Est", "Bas-Rhin": "Grand Est", "Haut-Rhin": "Grand Est", "Vosges": "Grand Est", # Hauts-de-France "Aisne": "Hauts-de-France", "Nord": "Hauts-de-France", "Oise": "Hauts-de-France", "Pas-de-Calais": "Hauts-de-France", "Somme": "Hauts-de-France", # Île-de-France "Paris": "Île-de-France", "Ville-de-Paris": "Île-de-France", "Seine-et-Marne": "Île-de-France", "Yvelines": "Île-de-France", "Essonne": "Île-de-France", "Hauts-de-Seine": "Île-de-France", "Seine-Saint-Denis": "Île-de-France", "Val-de-Marne": "Île-de-France", "Val-d'Oise": "Île-de-France", # Normandie "Calvados": "Normandie", "Eure": "Normandie", "Manche": "Normandie", "Orne": "Normandie", "Seine-Maritime": "Normandie", # Nouvelle-Aquitaine "Charente": "Nouvelle-Aquitaine", "Charente-Maritime": "Nouvelle-Aquitaine", "Corrèze": "Nouvelle-Aquitaine", "Creuse": "Nouvelle-Aquitaine", "Dordogne": "Nouvelle-Aquitaine", "Gironde": "Nouvelle-Aquitaine", "Landes": "Nouvelle-Aquitaine", "Lot-et-Garonne": "Nouvelle-Aquitaine", "Pyrénées-Atlantiques": "Nouvelle-Aquitaine", "Deux-Sèvres": "Nouvelle-Aquitaine", "Vienne": "Nouvelle-Aquitaine", "Haute-Vienne": "Nouvelle-Aquitaine", # Occitanie "Ariège": "Occitanie", "Aude": "Occitanie", "Aveyron": "Occitanie", "Gard": "Occitanie", "Haute-Garonne": "Occitanie", "Gers": "Occitanie", "Hérault": "Occitanie", "Lot": "Occitanie", "Lozère": "Occitanie", "Hautes-Pyrénées": "Occitanie", "Pyrénées-Orientales": "Occitanie", "Tarn": "Occitanie", "Tarn-et-Garonne": "Occitanie", # Pays de la Loire "Loire-Atlantique": "Pays de la Loire", "Maine-et-Loire": "Pays de la Loire", "Mayenne": "Pays de la Loire", "Sarthe": "Pays de la Loire", "Vendée": "Pays de la Loire", # Provence-Alpes-Côte d'Azur "Alpes-de-Haute-Provence": "Provence-Alpes-Côte d'Azur", "Hautes-Alpes": "Provence-Alpes-Côte d'Azur", "Alpes-Maritimes": "Provence-Alpes-Côte d'Azur", "Bouches-du-Rhône": "Provence-Alpes-Côte d'Azur", "Var": "Provence-Alpes-Côte d'Azur", "Vaucluse": "Provence-Alpes-Côte d'Azur", # DOM "Guadeloupe": "Guadeloupe", "Martinique": "Martinique", "Guyane": "Guyane", "La Réunion": "La Réunion", "Mayotte": "Mayotte", # Historical names (pre-reform or pre-1969) "Basses-Pyrénées": "Nouvelle-Aquitaine", # now Pyrénées-Atlantiques "Basses Pyrénées": "Nouvelle-Aquitaine", "Seine": "Île-de-France", # dissolved in 1968 "Seine-et-Oise": "Île-de-France", # dissolved in 1968 } # Case-insensitive lookup index _DEPT_LOWER: dict[str, str] = {k.lower(): v for k, v in _DEPT_TO_REGION.items()} # Canonical name index (for display — preserves original casing) _DEPT_CANONICAL: dict[str, str] = {k.lower(): k for k in _DEPT_TO_REGION} # Matches a time prefix with a city following: "12 h 00 à City" / "15h au Mas" _RE_TIME_WITH_CITY = re.compile( r"^(\d+)\s*h(?:\s*(\d+))?\s+(?:aux?|[àa])\s+(.+)$", re.I) # Matches a pure time string with no city _RE_TIME_ONLY = re.compile(r"^(\d+)\s*h(?:\s*(\d+))?\s*$", re.I) def _split_place(raw: str) -> tuple[str, str]: """ Split a raw place string into (gedcom_time, place_without_prefix). Returns: - ('HH:MM', 'City (Dept)') for '12 h 00 à City (Dept)' - ('HH:MM', '') for '2 h 30' (pure time, no city) - ('', raw) when no time prefix is found """ if not raw: return "", "" m = _RE_TIME_WITH_CITY.match(raw) if m: hours, minutes = int(m.group(1)), int(m.group(2) or 0) return f"{hours:02d}:{minutes:02d}", m.group(3).strip() m = _RE_TIME_ONLY.match(raw) if m: hours, minutes = int(m.group(1)), int(m.group(2) or 0) return f"{hours:02d}:{minutes:02d}", "" return "", raw def _expand_place(place: str) -> str: """ Convert 'Montpellier (Hérault)' → 'Montpellier, Hérault, Occitanie, France'. 'Camarade (Ariège) - Machicot' → 'Machicot, Camarade, Ariège, Occitanie, France'. Call _split_place() first to strip any time prefix before passing here. """ if not place: return "" m = re.search(r"^(.*?)\s*\(([^)]+)\)\s*(?:-\s*(.+))?$", place) if not m: return place # no parenthetical département — return as-is city = m.group(1).strip() dept_raw = m.group(2).strip() subdivision = m.group(3).strip() if m.group(3) else "" dept_key = dept_raw.lower() region = _DEPT_LOWER.get(dept_key, "") dept_display = _DEPT_CANONICAL.get(dept_key, dept_raw) parts = [] if subdivision: parts.append(subdivision) parts.append(city) parts.append(dept_display) if region: parts.append(region) parts.append("France") return ", ".join(parts) # ── Marriage contract / source parsing ─────────────────────────────────────── _RE_CONTRACT_SOURCE = re.compile( r"^(contrat\s+de\s+mariage\s+.+?)\s+-\s+(.+?)\s+-\s+(.+)$", re.I) _RE_FOLIO = re.compile(r"\b(folio\s+\S+(?:\s+\S+)*)\s*$", re.I) def _parse_contract_source(text: str) -> dict: """ Parse 'Contrat de Mariage chez Maître X - Dépôt - Cote folio N'. Returns {"title", "depot", "caln", "page"} or {}. """ m = _RE_CONTRACT_SOURCE.match(text.strip()) if not m: return {} title = m.group(1).strip() depot = m.group(2).strip() cote_raw = m.group(3).strip() fm = _RE_FOLIO.search(cote_raw) if fm: page = fm.group(1) caln = cote_raw[:fm.start()].strip() else: page = "" caln = cote_raw return {"title": title, "depot": depot, "caln": caln, "page": page} # ── Spouse context parsing ───────────────────────────────────────────────────── _RE_SPOUSE_CONTEXT = re.compile( r"s[''`]unit\s+avec\s+(.+?)(?=\.\s+(?:Ce couple|Ils\s+se|Le couple)|\.?\s*$)", re.I | re.S) _RE_SPOUSE_LIFE = re.compile( r"\((~?\d{4})\s*(?:-?>?\s*(~?\d{4}))?\)", re.I) def _parse_spouse_context(full_text: str) -> dict: """ Extract name, birth, death, occupation from 's'unit avec ...' sentence. Returns {"name", "birth", "death", "occu"}. """ m = _RE_SPOUSE_CONTEXT.search(full_text) if not m: return {} ctx = m.group(1).strip() # Name: up to first ( or , nm = re.match(r"([^(,]+)", ctx) name = nm.group(1).strip() if nm else ctx.split(",")[0].strip() # Dates from parenthetical (birth->death or ~birth) birth = death = "" dm = _RE_SPOUSE_LIFE.search(ctx) if dm: b_raw = dm.group(1) b_year = b_raw.lstrip("~") birth = f"ABT {b_year}" if b_raw.startswith("~") else b_year if dm.group(2): d_raw = dm.group(2) d_year = d_raw.lstrip("~") death = f"ABT {d_year}" if d_raw.startswith("~") else d_year # Occupation: text after dates (or after name) before "le fils/la fille/les enfants" occu = "" after = ctx[dm.end():].strip() if dm else ctx[len(name):].strip() after = after.lstrip(",").strip() om = re.match(r"([^,(]+?)(?=\s*,\s*(?:le|la|les)\s+(?:fils|fille|enfant)|$)", after, re.I) if om: candidate = om.group(1).strip().rstrip(".") # Only keep genuine occupations — reject parentage/family descriptions if (candidate and len(candidate) < 60 and not re.search(r"\best\b|\bsont\b|\bfille\b|\bfils\b|\benfant\b|\bparents\b", candidate, re.I)): occu = candidate return {"name": name, "birth": birth, "death": death, "occu": occu} # ── Gray paragraph grouping (witnesses, godparents, notes) ──────────────────── # Headers that introduce a group of witness lines _RE_WITNESS_HDR = re.compile( r"^(Présents?|Témoins?|Déclarants?)\s*:?\s*$", re.I) _RE_MARR_WITNESS_HDR = re.compile( r"^(Témoins?\s+au\s+mariage|Présents?\s+au\s+contrat|" r"Présents?\s+à\s+la\s+célébration|Présents?\s+au\s+mariage)\b", re.I) _RE_DEAT_WITNESS_HDR = re.compile( r"^(Témoins?\s+au\s+décès|Présents?\s+au\s+décès)\b", re.I) # Lines that are part of a witness list _RE_WITNESS_ITEM = re.compile(r"^[\-–•]\s+\S", re.I) # Single-line godparent references _RE_GODPARENT = re.compile(r"^(Parrain|Marraine)\s*[:;]?\s+\S", re.I) # Archive source references in gray paragraphs _RE_GRAY_ARCHIVE = re.compile( r"^Archives\s+d[eé]\w*\s+.+?(?:\s+-\s+|\s*:\s*)Registre\b", re.I) # Marriage-specific gray notes (contract, publications, dispensation, etc.) _RE_GRAY_MARR_NOTE = re.compile( r"^(?:" r"(?:Date\s+du\s+|(?:Un|Il\s+(?:existe\s+un|a\s+été\s+fait\s+un))\s+)?" r"Contrat\s+de\s+[Mm]ariage\b" r"|Contrat\s+passé\s+(?:chez|devant|par)\b" r"|Accord\s+(?:chez|devant|par)\s+Ma[iî]tre\b" r"|Acte\s+respectueux\b" r"|L[''']acte\s+de\s+mariage\b" r"|Publications?\s+de\s+[Mm]ariage\b" r"|Dispense\s+(?:de|au|du)\b" r"|La\s+mariée?\s+dit\b" r"|Le\s+marié?\s+dit\b" r")", re.I) def _group_gray_notes(all_paras: list[dict]) -> dict: """ Scan paragraphs in order and group gray ones by event association. Returns { "birth_notes": list of multiline strings (for BIRT event NOTE) "death_notes": list of multiline strings (for DEAT event NOTE) "marriage_notes": list of multiline strings (for MARR event NOTE) "general_notes": list of single-line strings (for INDI NOTE) } """ result: dict[str, list] = { "birth_notes": [], "death_notes": [], "marriage_notes": [], "general_notes": [], } # Track which events have been encountered (determines gray-note assignment) seen_marr = False seen_deat = False current_group: list[str] = [] # lines of the current witness group current_key: str = "" # "birth_notes" | "death_notes" | "marriage_notes" def flush(): nonlocal current_group, current_key if current_group and current_key: result[current_key].append("\n".join(current_group)) current_group = [] current_key = "" for para in all_paras: color = para["color"] text = para["text"].strip() if not text: continue # ── Non-gray paragraphs: update event context and flush ── if color != "gray": flush() if re.search(r"\bse marient\b|contrat de mariage\s+le\b", text, re.I): seen_marr = True if re.search(r"\best décédé\b|\bmeurt le\b", text, re.I): seen_deat = True continue # ── Gray paragraph ── # Determine which event key to use for ungrouped gray items def current_event_key() -> str: if seen_deat: return "death_notes" if seen_marr: return "marriage_notes" return "birth_notes" # 1a. Marriage-specific notes (contracts, publications, dispensations…) if _RE_GRAY_MARR_NOTE.match(text): flush() if len(text) > 10: result["marriage_notes"].append(text) continue # 1b. Archive/registry references → contextual event bucket if _RE_GRAY_ARCHIVE.match(text): flush() if len(text) > 10: result[current_event_key()].append(text) continue # 2. Explicit death-witness header if _RE_DEAT_WITNESS_HDR.match(text): flush() current_key = "death_notes" current_group = [text] continue # 3. Explicit marriage-witness header if _RE_MARR_WITNESS_HDR.match(text): flush() current_key = "marriage_notes" current_group = [text] continue # 4. Generic witness header ("Présents :", "Témoins :", "Déclarants :") if _RE_WITNESS_HDR.match(text): flush() current_key = current_event_key() current_group = [text] continue # 5. Witness item line (starts with "-") — append to current group or start one if _RE_WITNESS_ITEM.match(text): if current_key: current_group.append(text) else: # Orphan item (no preceding header) — start implicit group current_key = current_event_key() current_group = [text] continue # 6. Single-line godparent reference → birth note if _RE_GODPARENT.match(text): flush() if len(text) > 5: result["birth_notes"].append(text) continue # 7. Everything else → flush any group, then route by current event context flush() if len(text) > 10: result[current_event_key()].append(text) flush() return result def _emit_note_block(lines: list[str], base_level: int) -> list[str]: """ Emit a multiline note as GEDCOM NOTE + CONT lines. base_level: level of the NOTE line (2 for event-level, 1 for INDI-level). """ out = [] note_lines = lines if len(note_lines) == 1: out.append(gedcom_line(base_level, "NOTE", note_lines[0][:248])) else: out.append(gedcom_line(base_level, "NOTE", note_lines[0][:248])) for continuation in note_lines[1:]: out.append(gedcom_line(base_level + 1, "CONT", continuation[:248])) return out def _emit_grouped_notes(note_strings: list[str], base_level: int) -> list[str]: """Emit a list of note strings (each possibly multiline) as GEDCOM NOTE blocks.""" out = [] for ns in note_strings: out.extend(_emit_note_block(ns.split("\n"), base_level)) return out # ── French calendar helpers ─────────────────────────────────────────────────── MOIS_FR = { "janvier": "JAN", "février": "FEB", "fevrier": "FEB", "mars": "MAR", "avril": "APR", "mai": "MAY", "juin": "JUN", "juillet": "JUL", "août": "AUG", "aout": "AUG", "septembre": "SEP", "octobre": "OCT", "novembre": "NOV", "décembre": "DEC", "decembre": "DEC", } MOIS_PAT = "|".join(MOIS_FR.keys()) JOURS_PAT = "lundi|mardi|mercredi|jeudi|vendredi|samedi|dimanche" def fr_date(day: str | None, month_fr: str | None, year: str | None, prefix: str = "") -> str: """Convert French date parts to GEDCOM date string.""" parts = [] if prefix: parts.append(prefix) if day and day not in ("0", ""): parts.append(str(int(day))) if month_fr: m = MOIS_FR.get(month_fr.lower().strip()) if m: parts.append(m) if year: parts.append(year) return " ".join(parts) if parts else "" # ── Regex patterns ──────────────────────────────────────────────────────────── _D = rf"(?:(?:{JOURS_PAT})\s+)?(\d+)(?:er|ème|eme|e)?\s+({MOIS_PAT})\s+(\d{{4}})" # Birth RE_BORN_FULL = re.compile( rf"(?:voit le jour|est n[eé]e?|naît)\s+le\s+{_D}" rf"(?:\s+[àa]\s+(.*?))?(?:\.|$)", re.I) RE_BORN_APPROX = re.compile( rf"(?:voit le jour|est n[eé]e?|né[e]?)\s+vers\s+(?:({MOIS_PAT})\s+)?(\d{{4}})", re.I) RE_BORN_YEAR = re.compile( rf"(?:né[e]?|voit le jour)\s+(?:en|vers)\s+(\d{{4}})", re.I) RE_BAPTISM = re.compile( rf"(?:est baptisé[e]?)\s+[àa]\s+(.*?),\s+le\s+{_D}", re.I) RE_BORN_INLINE = re.compile(r"né[e]?\s+vers\s+(\d{4})", re.I) # "née vers 1699" RE_BORN_EN = re.compile(r"né[e]?\s+en\s+(\d{4})", re.I) # Death RE_DEAD_FULL = re.compile( rf"est décédé[e]?\s+le\s+{_D}(?:.*?[àa]\s+([\w\s'\(\),\-]+?))?(?:\.|$)", re.I) RE_DEAD_BEF = re.compile(r"est décédé[e]?\s+avant\s+(.+?)(?:\.|$)", re.I) RE_DEAD_AFT = re.compile(r"est décédé[e]?\s+après\s+(\d{4})", re.I) RE_DEAD_YEAR = re.compile(r"est décédé[e]?\s+en\s+(\d{4})", re.I) # RE_MEURT: only matches when NOT preceded by a relative clause in the same sentence # (sentences starting with "Sa mère/son père/son époux meurt" are excluded in parse_death) RE_MEURT = re.compile( rf"meurt\s+le\s+{_D}", re.I) _RE_RELATIVE_MEURT = re.compile( r"\b(?:sa|son)\s+(?:père|mère|époux|épouse|mari|femme|frère|sœur)\b[^.]*meurt", re.I) # Marriage RE_MARR = re.compile( rf"(?:se marient|mariage (?:civil|religieux|est célébré))[^\d]*le\s+{_D}" rf"(?:\s+[àa]\s+(.*?))?(?:\.|$)", re.I) RE_CONTRAT = re.compile( rf"contrat de mariage\s+le\s+{_D}" rf"(?:\s+[àa]\s+(.*?))?(?:\.|$)", re.I) RE_SPOUSE = re.compile( r"(?:Il|Elle)\s+s[''`]unit\s+avec\s+(.*?)(?:,|\()", re.I) RE_SPOUSE_DATES = re.compile(r"\(([~\d]{4})-?([~\d]{4})?\)", re.I) # Occupation RE_OCCU_SERA = re.compile(r"\w+\s+sera\s+([^.]+)\.", re.I) RE_OCCU_EST = re.compile(r"\w+\s+est\s+([a-zéàèù][a-zéàèù\-\s]+?)[\.,]", re.I) # Person ID header — matches standalone IDs like "1", "1a", "1.2", "1.2b", "1.4.4b.1" RE_PERSON_ID = re.compile(r"^(\d+[a-z]?(?:\.\d+[a-z]?)*)\s*$", re.I) # Name line (bold): "Pierre FABRE voit le jour..." RE_NAME_LINE = re.compile( r"^([A-ZÀ-Ü][a-zà-ü\-]+(?:\s+[A-ZÀ-Ü][a-zà-ü\-]+)*" # first name(s) r"\s+[A-ZÀÂÇÉÈÊËÎÏÔÛÙÜŸÆŒ][A-ZÀÂÇÉÈÊËÎÏÔÛÙÜŸÆŒ\s'\-]+?)" # SURNAME r"\s+(?:voit le jour|est né[e]?|naît|né[e]?\s)", re.I ) # Sex from prose RE_FILS = re.compile(r"\b(Il|Elle)\s+est\s+(?:l[e']|le)\s*fils\b", re.I) RE_FILLE = re.compile(r"\b(Il|Elle)\s+est\s+(?:l[ae']|la)\s*fille\b", re.I) RE_FILS2 = re.compile(r"\bfils\s+(?:légitim|naturel)", re.I) RE_FILLE2 = re.compile(r"\bfille\s+(?:légitim|naturell)", re.I) def parse_sex(full_text: str) -> str: """Return 'M', 'F', or '' from prose clues.""" if RE_FILS.search(full_text) or RE_FILS2.search(full_text): return "M" if RE_FILLE.search(full_text) or RE_FILLE2.search(full_text): return "F" # Pronoun fallback if re.search(r"\bIl\s+est\b", full_text): return "M" if re.search(r"\bElle\s+est\b", full_text): return "F" return "" def parse_birth(full_text: str) -> dict: """Extract birth/baptism date and place.""" result = {"date": "", "plac": "", "type": "BIRT"} m = RE_BORN_FULL.search(full_text) if m: result["date"] = fr_date(m.group(1), m.group(2), m.group(3)) result["plac"] = _clean_place(m.group(4) or "") return result m = RE_BAPTISM.search(full_text) if m: result["type"] = "BAPM" result["plac"] = _clean_place(m.group(1) or "") result["date"] = fr_date(m.group(2), m.group(3), m.group(4)) return result m = RE_BORN_APPROX.search(full_text) if m: result["date"] = fr_date(None, m.group(1), m.group(2), "ABT") return result for pat in (RE_BORN_YEAR, RE_BORN_EN, RE_BORN_INLINE): m = pat.search(full_text) if m: result["date"] = "ABT " + m.group(1) return result return result # Matches "à l'âge de ..." to be skipped in death sentences _AGE_CLAUSE = re.compile(r",\s*[àa]\s+l['']\âge\s+de\s+[^,]+", re.I) # Matches final place: last ", à Place" before period _DEAD_PLACE = re.compile(r",\s*[àa]\s+([A-ZÀ-Ü][^,.]+?(?:\([A-Za-zÀ-Ü\s\-]+\))?)\s*(?:\.|$)", re.I) def _extract_death_place(sentence: str) -> str: """Extract place from a death sentence, skipping 'à l'âge de' clauses.""" # Remove age clause so we don't pick it up as a place cleaned = _AGE_CLAUSE.sub("", sentence) # Find last place mention matches = list(_DEAD_PLACE.finditer(cleaned)) if matches: return _clean_place(matches[-1].group(1)) return "" def parse_death(full_text: str) -> dict: """Extract death date and place.""" result = {"date": "", "plac": ""} m = RE_DEAD_FULL.search(full_text) if m: result["date"] = fr_date(m.group(1), m.group(2), m.group(3)) # Extract place from the full death sentence separately # Find the sentence that contains the match sent_start = full_text.rfind("est décédé", 0, m.end()) if sent_start == -1: sent_start = m.start() sentence = full_text[sent_start:full_text.find(".", m.end()) + 1] result["plac"] = _extract_death_place(sentence) return result # RE_MEURT: only when the sentence is about the main person, not a relative m = RE_MEURT.search(full_text) if m: # Check the sentence containing this match sent_start = full_text.rfind(".", 0, m.start()) sentence = full_text[sent_start + 1: full_text.find(".", m.end()) + 1] if not _RE_RELATIVE_MEURT.search(sentence): result["date"] = fr_date(m.group(1), m.group(2), m.group(3)) return result m = RE_DEAD_BEF.search(full_text) if m: raw = m.group(1).strip().split(",")[0].rstrip(".") result["date"] = "BEF " + raw return result m = RE_DEAD_AFT.search(full_text) if m: result["date"] = "AFT " + m.group(1) return result m = RE_DEAD_YEAR.search(full_text) if m: result["date"] = m.group(1) return result return result _RE_NO_CHILDREN = re.compile(r"pas\s+d[''e]enfants|il\s+n[''y]\s+a\s+pas", re.I) _RE_HAS_CHILDREN = re.compile( r"(?:aura|a\s+eu|avez?|ont)\s+\w+\s+enfants?|(?:ce\s+couple|ils)\s+aura", re.I) def _parse_one_marriage(segment: str) -> dict: """Parse spouse + date + place from a single 's'unit avec …' segment.""" result = {"date": "", "plac": "", "spouse": "", "spouse_birth": "", "spouse_death": "", "spouse_occu": "", "source": {}, "has_children_text": False} spouse_info = _parse_spouse_context(segment) if spouse_info.get("name"): result["spouse"] = spouse_info["name"] result["spouse_birth"] = spouse_info.get("birth", "") result["spouse_death"] = spouse_info.get("death", "") result["spouse_occu"] = spouse_info.get("occu", "") else: m = RE_SPOUSE.search(segment) if m: result["spouse"] = re.sub(r"\s*\(.*?\)", "", m.group(1).strip()).strip() for pat in (RE_MARR, RE_CONTRAT): m = pat.search(segment) if m: result["date"] = fr_date(m.group(1), m.group(2), m.group(3)) result["plac"] = _clean_place(m.group(4) or "") break # Detect inline children mention in this segment result["has_children_text"] = ( bool(_RE_HAS_CHILDREN.search(segment)) and not bool(_RE_NO_CHILDREN.search(segment)) ) return result def parse_marriages(full_text: str, italic_texts: list[str] | None = None) -> list[dict]: """ Return list of marriage dicts, one per union found in full_text. Each dict: {spouse, spouse_birth, spouse_death, spouse_occu, date, plac, source, has_children_text}. """ splits = [m.start() for m in re.finditer(r"\bs[''`]unit\s+avec\b", full_text, re.I)] if not splits: return [] marriages = [] for i, start in enumerate(splits): end = splits[i + 1] if i + 1 < len(splits) else len(full_text) seg = full_text[start:end] marriages.append(_parse_one_marriage(seg)) # Assign contract sources from italic paragraphs to the best-matching marriage for it in (italic_texts or []): src = _parse_contract_source(it) if not src: continue # Prefer marriage with a date; fall back to last target = next((m for m in reversed(marriages) if m["date"]), marriages[-1]) if not target["source"]: target["source"] = src return marriages def parse_occupation(full_text: str) -> str: m = RE_OCCU_SERA.search(full_text) if m: return m.group(1).strip().rstrip(".") return "" def _clean_place(raw: str) -> str: """Normalise a place string extracted from HTML text.""" if not raw: return "" # Strip trailing punctuation (keep closing paren if place has department in parens) p = raw.strip().rstrip(".,;(") p = re.sub(r"\s+", " ", p).strip() # Trim at known sentence-ending words p = re.split(r"\s+(?:Il|Elle|Ce|Ils|Leur|Le|La|Les|Un|Une|Son|Sa)\b", p, maxsplit=1)[0] return p[:80] # GEDCOM line limit # ── HTML / paragraph parsing ────────────────────────────────────────────────── def extract_paragraphs(html_body: str) -> list[dict]: """ Parse HTML body into a list of paragraph dicts: {text, color, is_bold, bold_text} Colors: black, navy, red, gray (from inline CSS or color=). """ soup = BeautifulSoup(html_body, "html.parser") def tag_color(tag) -> str: style = tag.get("style", "") m = re.search(r"color:\s*(\w+)", style) if m: return m.group(1).lower() color_attr = tag.get("color", "") if color_attr: named = {"#000000": "black", "#000080": "navy", "navy": "navy", "red": "red", "gray": "gray", "grey": "gray"} return named.get(color_attr.lower(), color_attr.lower()) return "" # Collect all block elements in document order: # - all

tags # - leaf

tags (no nested div children) — some families use divs instead of p block_tags = [ tag for tag in soup.find_all(["p", "div"]) if tag.name == "p" or not tag.find("div") ] paragraphs = [] for p in block_tags: # Determine dominant color (first explicit color found) color = "black" for tag in p.descendants: if hasattr(tag, "get"): c = tag_color(tag) if c: color = c break # Bold detection — or bold_spans = p.find_all(["b", "strong"]) bold_text = " ".join(b.get_text(" ", strip=True) for b in bold_spans).strip() is_bold = bool(bold_text) # Full text full_text = p.get_text(" ", strip=True).replace("\xa0", " ").strip() full_text = re.sub(r"\s+", " ", full_text) # Italic detection — whole paragraph is italic when all visible text is in / italic_spans = p.find_all(["i", "em"]) is_italic = bool(italic_spans) and not is_bold if full_text: paragraphs.append({ "text": full_text, "color": color, "is_bold": is_bold, "bold_text": re.sub(r"\s+", " ", bold_text), "is_italic": is_italic, }) return paragraphs def split_into_person_blocks(paragraphs: list[dict]) -> list[dict]: """ Split paragraph list into person blocks using the bold ID pattern. Returns list of {id, name_line, paras}. Generation-1 pages have no explicit ID line; we assign id="1". """ blocks = [] current = None for para in paragraphs: text = para["text"] bold_text = para["bold_text"] # ── Is this a standalone person-ID line? ── # Criterion: bold, black, and the ENTIRE text (stripped) is a valid ID if para["is_bold"] and para["color"] in ("black", ""): candidate = re.sub(r"[\s\xa0]+", "", bold_text) full_stripped = re.sub(r"[\s\xa0]+", "", text) if RE_PERSON_ID.match(candidate) and RE_PERSON_ID.match(full_stripped): if current: blocks.append(current) current = {"id": candidate, "name_line": "", "paras": []} continue # ── Is this a name+birth line? (bold start, no id yet ── if para["is_bold"] and current is not None and not current["name_line"]: current["name_line"] = text current["paras"].append(para) continue # ── Generation-1 edge case: first bold non-id paragraph ── if para["is_bold"] and current is None: # Likely the gen-1 title paragraph, skip # But if it looks like a name+birth, create implicit id="0" (root ancestor) if RE_NAME_LINE.match(bold_text) or RE_NAME_LINE.match(text): current = {"id": "0", "name_line": text, "paras": [para]} continue if current is not None: current["paras"].append(para) if current: blocks.append(current) return blocks def parse_block(block: dict, family_name: str) -> dict: """ Convert a person block into a structured person dict. """ person_id = block["id"] name_line = block["name_line"] all_paras = block["paras"] full_text = " ".join(p["text"] for p in all_paras) gray_notes = [p["text"] for p in all_paras if p["color"] == "gray"] # ── Name ── # Prefer the bold text of the first paragraph (reliable) over regex extraction bold0 = (all_paras[0]["bold_text"] if all_paras else "").strip() if bold0 and not RE_PERSON_ID.match(re.sub(r"[\s\xa0]+", "", bold0)): name = bold0 elif name_line: # Fallback: extract name portion before birth keyword name = re.split(r"\s+(?:voit le jour|est né[e]?|naît|né[e]?\s)", name_line, maxsplit=1)[0].strip() else: name = "" # Split name into given/surname: surname is the ALL-CAPS part given, surname = _split_name(name) # ── Sex — use only the intro sentence (before spouse description) ── # Searching the whole block picks up spouse/child "fils légitime" etc. intro_end = re.search(r"\bs[''`]unit\s+avec\b", full_text) sex_context = full_text[: intro_end.start()] if intro_end else full_text[:500] sex = parse_sex(sex_context) # Secondary: "est née" / "est né" in the name line if not sex: if re.search(r"\best\s+née\b", full_text[:300], re.I): sex = "F" elif re.search(r"\best\s+né\b(?!e)", full_text[:300], re.I): sex = "M" # Fallback from name color in later text if not sex and all_paras: for p in all_paras: if p["color"] == "navy": sex = "M"; break if p["color"] == "red": sex = "F"; break # ── Birth / Baptism ── birth = parse_birth(full_text) # ── Death ── death = parse_death(full_text) # ── Marriages (may be plural) ── italic_texts = [p["text"] for p in all_paras if p.get("is_italic")] marriages = parse_marriages(full_text, italic_texts) # ── Occupation ── occu = parse_occupation(full_text) # ── Gray paragraph groups (witnesses, godparents, general notes) ── grouped = _group_gray_notes(all_paras) # Attach marriage notes to the first marriage; create minimal entry if needed if grouped["marriage_notes"]: if marriages: marriages[0].setdefault("notes", []).extend(grouped["marriage_notes"]) else: marriages = [{ "date": "", "plac": "", "spouse": "", "spouse_birth": "", "spouse_death": "", "spouse_occu": "", "source": {}, "has_children_text": False, "notes": list(grouped["marriage_notes"]), }] # ── Children listed inline ── children_inline = _extract_children_inline(full_text) return { "id": person_id, "family": family_name, "name": name, "given": given, "surname": surname, "sex": sex, "birth": birth, "death": death, "marriages": marriages, "occupation": occu, "birth_notes": grouped["birth_notes"], "death_notes": grouped["death_notes"], "general_notes": grouped["general_notes"], "children_inline": children_inline, "full_text": full_text, } def _split_name(name: str) -> tuple[str, str]: """Split 'Pierre FABRE' → ('Pierre', 'FABRE').""" # Surname = longest contiguous run of uppercase tokens at the END tokens = name.split() surname_tokens = [] given_tokens = [] i = len(tokens) - 1 while i >= 0 and (tokens[i].upper() == tokens[i] or tokens[i] in ("de", "d'", "du", "des", "l'", "la", "le")): surname_tokens.insert(0, tokens[i]) i -= 1 given_tokens = tokens[:i+1] return " ".join(given_tokens), " ".join(surname_tokens) def _extract_children_inline(text: str) -> list[str]: """ Extract names of children listed as '- Name né(e) en YEAR' Returns list of first-name strings. """ children = [] for m in re.finditer(r"-\s+([A-ZÀ-Ü][a-zà-üA-ZÀ-Ü\s]+?)\s+né", text): children.append(m.group(1).strip()) return children # ── SQL extraction ──────────────────────────────────────────────────────────── def parse_sql_values(line: str) -> list: s = line.strip() if s.startswith("("): s = s[1:] for sfx in (");", "),", ")"): if s.endswith(sfx): s = s[:-len(sfx)]; break ESC = {"n":"\n","r":"\r","t":"\t","b":"\x08", "\\":"\\"," '":"'",'"':'"',"0":"\x00","Z":"\x1a"} values, i, n = [], 0, len(s) while i < n: while i < n and s[i] in " \t": i += 1 if i >= n: break if s[i:i+4] == "NULL": values.append(None); i += 4 elif s[i] == "'": i += 1; buf = [] while i < n: c = s[i] if c == "\\" and i+1 < n: buf.append(ESC.get(s[i+1], s[i+1])); i += 2 elif c == "'": i += 1; break else: buf.append(c); i += 1 values.append("".join(buf)) else: j = i while j < n and s[j] != ",": j += 1 values.append(s[i:j].strip()); i = j while i < n and s[i] in " \t,": i += 1 return values def stream_filiation_nodes(sql_file: Path) -> dict[int, dict]: """ One-pass stream: collect title+type from drupal_node, body from drupal_node_revisions for all 'book' type nodes whose title contains 'filiation'. Returns {nid: {title, body}}. """ node_cols = [] rev_cols = [] nodes: dict[int, dict] = {} current_table = None TARGETS = {"drupal_node", "drupal_node_revisions"} INSERT_RE = re.compile(r"INSERT INTO `([^`]+)` \((.+)\) VALUES", re.I) with open(sql_file, encoding="utf-8", errors="replace") as fh: for line in fh: ls = line.rstrip("\r\n") m = INSERT_RE.match(ls) if m: tname = m.group(1) current_table = tname if tname in TARGETS else None if tname == "drupal_node": node_cols = [c.strip().strip("`") for c in m.group(2).split(",")] elif tname == "drupal_node_revisions": rev_cols = [c.strip().strip("`") for c in m.group(2).split(",")] continue if current_table is None: continue stripped = ls.strip() if not stripped.startswith("("): if stripped.endswith(";"): current_table = None continue row = parse_sql_values(stripped) if current_table == "drupal_node" and node_cols: d = dict(zip(node_cols, row)) if d.get("type") == "book": title = d.get("title", "") if "filiation" in title.lower() or "filiations" in title.lower(): try: nid = int(d["nid"]) except (ValueError, TypeError): pass else: nodes[nid] = {"title": title, "body": ""} elif current_table == "drupal_node_revisions" and rev_cols: d = dict(zip(rev_cols, row)) try: nid = int(d["nid"]) except (ValueError, TypeError): continue if nid in nodes: nodes[nid]["body"] = d.get("body") or "" if stripped.endswith(";"): current_table = None return nodes # ── Family grouping ─────────────────────────────────────────────────────────── def family_name_from_title(title: str) -> str: """'Les filiations FABRE : Génération 3' → 'FABRE'""" t = title.replace("Les filiations", "").strip() # Remove suffix starting at ' :' t = t.split(":")[0].strip() # Remove leading d', de , d' t = re.sub(r"^(?:d[''e]\s*|de\s+|du\s+|des\s+|l['']\s*)", "", t, flags=re.I) return t.strip() def generation_number(title: str) -> int: """Extract generation number; 0 for root/présentation pages.""" m = re.search(r"[Gg]én[eé]ration\s+(\d+)", title) return int(m.group(1)) if m else 0 def group_by_family(nodes: dict[int, dict]) -> dict[str, list[dict]]: """ Return {family_name: [sorted list of {nid, title, body, gen}]} Only includes generation pages (gen > 0) and the root page (gen == 0 when not 'présentation'). """ families: dict[str, list] = {} for nid, info in nodes.items(): title = info["title"] fname = family_name_from_title(title) gen = generation_number(title) if "présentation" in title.lower(): continue # skip intro pages if not fname: continue families.setdefault(fname, []).append( {"nid": nid, "title": title, "body": info["body"], "gen": gen} ) for fname in families: families[fname].sort(key=lambda x: (x["gen"] == 0, x["gen"])) return families # ── Cross-page person assembly ──────────────────────────────────────────────── def parent_id(person_id: str) -> str | None: """ Given a person ID like '1.4.4b.1', return the parent's ID '1.4.4b'. Returns None for root. """ parts = person_id.rsplit(".", 1) if len(parts) == 1: return None return parts[0] if parts[0] else None def child_union_letter(person_id: str) -> str: """ Return the union letter from the last component of a person ID. '5.1.7.1a.5b.3a.7.1a' → 'a' (child 1 of union "a" of parent) '5.1.7.1a.5b.3a.7.2' → '' (no explicit union letter) """ last = person_id.rsplit(".", 1)[-1] m = re.match(r"^\d+([a-z]*)$", last, re.I) return m.group(1).lower() if m else "" # ── GEDCOM generation ───────────────────────────────────────────────────────── _indi_counter = 0 _fam_counter = 0 _sour_counter = 0 _repo_counter = 0 # Registries reset per file _sour_registry: dict[tuple, str] = {} # (title_lc, depot_lc, caln_lc) → xref _repo_registry: dict[str, str] = {} # depot_lc → xref _sour_records: dict[str, dict] = {} # xref → {title, depot_xref, caln} _repo_records: dict[str, str] = {} # xref → name def new_indi() -> str: global _indi_counter _indi_counter += 1 return f"@I{_indi_counter:04d}@" def new_fam() -> str: global _fam_counter _fam_counter += 1 return f"@F{_fam_counter:04d}@" def new_sour() -> str: global _sour_counter _sour_counter += 1 return f"@S{_sour_counter:04d}@" def new_repo() -> str: global _repo_counter _repo_counter += 1 return f"@R{_repo_counter:04d}@" def _get_or_create_repo(depot: str) -> str: key = depot.strip().lower() if key in _repo_registry: return _repo_registry[key] rx = new_repo() _repo_registry[key] = rx _repo_records[rx] = depot.strip() return rx def _get_or_create_sour(title: str, depot: str, caln: str) -> str: key = (title.strip().lower(), depot.strip().lower(), caln.strip().lower()) if key in _sour_registry: return _sour_registry[key] rx = new_sour() _sour_registry[key] = rx repo_xref = _get_or_create_repo(depot) if depot else "" _sour_records[rx] = {"title": title.strip(), "repo_xref": repo_xref, "caln": caln.strip()} return rx def gedcom_line(level: int, tag: str, value: str = "") -> str: line = f"{level} {tag}" if value: line += f" {value}" return line def person_to_gedcom(person: dict, indi_ref: str, famc: list[str], fams: list[str]) -> list[str]: """Build GEDCOM INDI record lines for one person.""" lines = [gedcom_line(0, indi_ref, "INDI")] # Name given = person.get("given", "") surname = person.get("surname", "") full = f"{given} /{surname}/" if surname else given if full: lines.append(gedcom_line(1, "NAME", full)) if given: lines.append(gedcom_line(2, "GIVN", given)) if surname: lines.append(gedcom_line(2, "SURN", surname)) # Sex sex = person.get("sex", "") if sex: lines.append(gedcom_line(1, "SEX", sex)) # Birth / Baptism birth = person.get("birth", {}) birth_has_data = birth.get("date") or birth.get("plac") birth_notes = person.get("birth_notes", []) if birth_has_data or birth_notes: event_tag = birth.get("type", "BIRT") lines.append(gedcom_line(1, event_tag)) if birth_has_data: _t, _raw = _split_place(birth.get("plac", "")) if birth.get("date"): lines.append(gedcom_line(2, "DATE", birth["date"])) if _t: lines.append(gedcom_line(3, "TIME", _t)) elif _t: lines.append(gedcom_line(2, "TIME", _t)) _p = _expand_place(_raw) if _p: lines.append(gedcom_line(2, "PLAC", _p)) lines.extend(_emit_grouped_notes(birth_notes, base_level=2)) # Death death = person.get("death", {}) death_has_data = death.get("date") or death.get("plac") death_notes = person.get("death_notes", []) if death_has_data or death_notes: lines.append(gedcom_line(1, "DEAT")) if death_has_data: _t, _raw = _split_place(death.get("plac", "")) if death.get("date"): lines.append(gedcom_line(2, "DATE", death["date"])) if _t: lines.append(gedcom_line(3, "TIME", _t)) elif _t: lines.append(gedcom_line(2, "TIME", _t)) _p = _expand_place(_raw) if _p: lines.append(gedcom_line(2, "PLAC", _p)) lines.extend(_emit_grouped_notes(death_notes, base_level=2)) # Occupation occu = person.get("occupation", "") if occu: lines.append(gedcom_line(1, "OCCU", occu)) # Family links for fc in famc: lines.append(gedcom_line(1, "FAMC", fc)) for fs in fams: lines.append(gedcom_line(1, "FAMS", fs)) # General notes (INDI level) lines.extend(_emit_grouped_notes(person.get("general_notes", []), base_level=1)) return lines def build_gedcom_for_family(family_name: str, pages: list[dict]) -> list[str]: """ Parse all generation pages for a family, build persons dict, resolve links, and emit GEDCOM lines. """ global _indi_counter, _fam_counter # ── Step 1: parse all pages into a flat persons dict ── persons_by_id: dict[str, dict] = {} # person_id → person data for page in pages: body = page["body"] if not body.strip(): continue paras = extract_paragraphs(body) blocks = split_into_person_blocks(paras) for block in blocks: p = parse_block(block, family_name) pid = p["id"] if pid in persons_by_id: # Merge: later pages may have more detail existing = persons_by_id[pid] for field in ("birth", "death", "occupation"): if not existing.get(field) and p.get(field): existing[field] = p[field] for notes_field in ("birth_notes", "death_notes", "general_notes"): existing.setdefault(notes_field, []).extend(p.get(notes_field, [])) # Merge marriages: append new spouses not already known if p.get("marriages"): ex_spouses = {m["spouse"].lower() for m in existing.get("marriages", [])} for nm in p["marriages"]: if nm["spouse"].lower() not in ex_spouses: existing.setdefault("marriages", []).append(nm) ex_spouses.add(nm["spouse"].lower()) # Update sex if missing if not existing.get("sex") and p.get("sex"): existing["sex"] = p["sex"] else: persons_by_id[pid] = p if not persons_by_id: return [] # ── Step 2: assign INDI xrefs ── xref: dict[str, str] = {} for pid in sorted(persons_by_id.keys()): xref[pid] = new_indi() # ── Step 3: resolve parent→child links and union letters ── # "0" is the implicit root (gen-1 ancestor) has_root = "0" in persons_by_id # For each child: which union letter do they belong to? # child_ul[pid] = "" | "a" | "b" | ... # parent_union_letters[parent_pid] = sorted set of union letters seen in children child_ul: dict[str, str] = {} parent_union_letters: dict[str, list[str]] = {} for pid in persons_by_id: if pid == "0": continue par = parent_id(pid) if par is None and has_root: par = "0" if par and par in persons_by_id: ul = child_union_letter(pid) child_ul[pid] = ul ls = parent_union_letters.setdefault(par, []) if ul not in ls: ls.append(ul) for ls in parent_union_letters.values(): ls.sort() # fam_key = parent_pid + "#" + union_letter (or parent_pid if only 1 marriage) # We determine fam_keys from marriages list AND from children's actual union letters. # # Strategy: for each parent: # - union letters from children tell us which unions produced descendants # - text marriages list tells us all unions (including childless ones) # - We match: marriages marked has_children_text → union letters (in order) # remaining marriages → synthetic childless keys def fam_keys_for_parent(par_pid: str) -> list[str]: """ Return ordered list of fam_keys for this parent's marriages. One fam_key per marriage in text order. """ marriages = persons_by_id[par_pid].get("marriages", []) if not marriages: return [] if len(marriages) == 1: # Single marriage: use letters from children, or bare parent_pid uls = parent_union_letters.get(par_pid, [""]) return [f"{par_pid}#{uls[0]}" if uls else par_pid] # Multiple marriages: split into "with-children" and "childless" groups # using the text hint, then map union letters union_letters = sorted(parent_union_letters.get(par_pid, [])) with_children = [m for m in marriages if m.get("has_children_text")] without_children = [m for m in marriages if not m.get("has_children_text")] # Fallback: if text detection failed, assume last marriage has children if not with_children and union_letters: with_children = [marriages[-1]] without_children = marriages[:-1] keys = [] ul_iter = iter(union_letters) childless_idx = [0] for m in marriages: if m in with_children: ul = next(ul_iter, f"_ul{len(keys)}") keys.append(f"{par_pid}#{ul}") else: keys.append(f"{par_pid}#childless{childless_idx[0]}") childless_idx[0] += 1 return keys # Build fam_xrefs: fam_key → GEDCOM @Fxxxx@ xref fam_xrefs: dict[str, str] = {} # Build famc map: child_pid → fam_key (which FAM this child belongs to) famc_fam: dict[str, str] = {} for pid in persons_by_id: if pid == "0": continue par = parent_id(pid) if par is None and has_root: par = "0" if not (par and par in persons_by_id): continue # Determine which fam_key this child belongs to ul = child_ul.get(pid, "") par_marriages = persons_by_id[par].get("marriages", []) par_keys = fam_keys_for_parent(par) # Create fam_xrefs for all marriages of parent if not yet done for fk in par_keys: if fk not in fam_xrefs: fam_xrefs[fk] = new_fam() # Match child to correct fam_key by union letter # fam_key format: "parent#ul" or "parent" (single marriage) matched_key = None if par_keys: # Try to find the key that contains this union letter for fk in par_keys: suffix = fk.split("#", 1)[1] if "#" in fk else "" if suffix == ul or (not suffix and not ul): matched_key = fk break if matched_key is None: matched_key = par_keys[0] # fallback if matched_key: famc_fam[pid] = matched_key # Also ensure FAM records exist for parents who only have marriages (no children in tree) for par_pid, person in persons_by_id.items(): if not person.get("marriages"): continue par_keys = fam_keys_for_parent(par_pid) for fk in par_keys: if fk not in fam_xrefs: fam_xrefs[fk] = new_fam() # Build reverse: fam_key → list of child_pids fam_children: dict[str, list[str]] = {} for child_pid, fk in famc_fam.items(): fam_children.setdefault(fk, []).append(child_pid) # person_famc: child_pid → @Fxxxx@ xref person_famc: dict[str, str] = { pid: fam_xrefs[fk] for pid, fk in famc_fam.items() if fk in fam_xrefs } # person_fams: parent_pid → list of @Fxxxx@ xrefs (one per marriage) person_fams: dict[str, list[str]] = {} for fk, fr in fam_xrefs.items(): par_pid = fk.split("#")[0] person_fams.setdefault(par_pid, []).append(fr) # ── Step 4: emit GEDCOM ── lines = [] # spouse_data: name_lc → {xref, sex, birth, death, occu, fams} spouse_data: dict[str, dict] = {} def get_or_create_spouse(name: str, sex: str) -> str: key = name.strip().lower() if key not in spouse_data: sx = new_indi() spouse_data[key] = {"xref": sx, "name": name, "sex": sex, "birth": "", "death": "", "occu": "", "fams": []} return spouse_data[key]["xref"] # INDI records for known persons for pid, person in sorted(persons_by_id.items()): indi_ref = xref[pid] famc_list = [person_famc[pid]] if pid in person_famc else [] fams_list = person_fams.get(pid, []) lines += person_to_gedcom(person, indi_ref, famc_list, fams_list) # FAM records — one per fam_key for fam_key, fam_ref in fam_xrefs.items(): par_pid = fam_key.split("#")[0] parent = persons_by_id.get(par_pid, {}) par_sex = parent.get("sex", "") par_xref = xref.get(par_pid, "") # Identify which marriage this FAM corresponds to par_keys = fam_keys_for_parent(par_pid) try: marr_idx = par_keys.index(fam_key) except ValueError: marr_idx = 0 marriages = parent.get("marriages", []) marr = marriages[marr_idx] if marr_idx < len(marriages) else {} lines.append(gedcom_line(0, fam_ref, "FAM")) # Parent as HUSB or WIFE if par_sex == "F": lines.append(gedcom_line(1, "WIFE", par_xref)) else: lines.append(gedcom_line(1, "HUSB", par_xref)) # Spouse spouse_name = marr.get("spouse", "") if spouse_name: skey = spouse_name.strip().lower() spouse_sex = "F" if par_sex == "M" else "M" get_or_create_spouse(spouse_name, spouse_sex) sd = spouse_data[skey] if not sd["birth"] and marr.get("spouse_birth"): sd["birth"] = marr["spouse_birth"] if not sd["death"] and marr.get("spouse_death"): sd["death"] = marr["spouse_death"] if not sd["occu"] and marr.get("spouse_occu"): sd["occu"] = marr["spouse_occu"] sd["fams"].append(fam_ref) spouse_xref = sd["xref"] if par_sex == "M": lines.append(gedcom_line(1, "WIFE", spouse_xref)) else: lines.append(gedcom_line(1, "HUSB", spouse_xref)) # Marriage event marr_notes = marr.get("notes", []) if marr.get("date") or marr.get("plac") or marr_notes: lines.append(gedcom_line(1, "MARR")) _t, _raw = _split_place(marr.get("plac", "")) if marr.get("date"): lines.append(gedcom_line(2, "DATE", marr["date"])) if _t: lines.append(gedcom_line(3, "TIME", _t)) elif _t: lines.append(gedcom_line(2, "TIME", _t)) _p = _expand_place(_raw) if _p: lines.append(gedcom_line(2, "PLAC", _p)) src = marr.get("source", {}) if src.get("title"): sour_xref = _get_or_create_sour( src["title"], src.get("depot", ""), src.get("caln", "")) lines.append(gedcom_line(2, "SOUR", sour_xref)) if src.get("page"): lines.append(gedcom_line(3, "PAGE", src["page"])) lines.extend(_emit_grouped_notes(marr_notes, base_level=2)) # Children belonging to this FAM for child_pid in sorted(fam_children.get(fam_key, [])): child_xref = xref.get(child_pid, "") if child_xref: lines.append(gedcom_line(1, "CHIL", child_xref)) # Spouse INDI records — emitted AFTER FAM loop so spouse_data is complete for sd in spouse_data.values(): sx = sd["xref"] s_name = sd["name"] given, surname = _split_name(s_name) # Preserve original capitalisation for the given name; surname stays upper if not given: given, surname = _split_name(s_name.title()) full = f"{given} /{surname}/" if surname else (given or s_name) lines.append(gedcom_line(0, sx, "INDI")) lines.append(gedcom_line(1, "NAME", full)) if given: lines.append(gedcom_line(2, "GIVN", given)) if surname: lines.append(gedcom_line(2, "SURN", surname)) if sd["sex"]: lines.append(gedcom_line(1, "SEX", sd["sex"])) if sd["birth"]: lines.append(gedcom_line(1, "BIRT")) lines.append(gedcom_line(2, "DATE", sd["birth"])) if sd["death"]: lines.append(gedcom_line(1, "DEAT")) lines.append(gedcom_line(2, "DATE", sd["death"])) if sd["occu"]: lines.append(gedcom_line(1, "OCCU", sd["occu"])) for fref in sd["fams"]: lines.append(gedcom_line(1, "FAMS", fref)) # REPO records for rx, rname in _repo_records.items(): lines.append(gedcom_line(0, rx, "REPO")) lines.append(gedcom_line(1, "NAME", rname)) # SOUR records for sx, srec in _sour_records.items(): lines.append(gedcom_line(0, sx, "SOUR")) lines.append(gedcom_line(1, "TITL", srec["title"])) if srec["repo_xref"]: lines.append(gedcom_line(1, "REPO", srec["repo_xref"])) if srec["caln"]: lines.append(gedcom_line(2, "CALN", srec["caln"])) return lines def build_gedcom_file(family_name: str, pages: list[dict]) -> str: """Return complete GEDCOM file content for one family.""" global _indi_counter, _fam_counter, _sour_counter, _repo_counter global _sour_registry, _repo_registry, _sour_records, _repo_records # Reset counters and registries per file _indi_counter = 0 _fam_counter = 0 _sour_counter = 0 _repo_counter = 0 _sour_registry = {} _repo_registry = {} _sour_records = {} _repo_records = {} body_lines = build_gedcom_for_family(family_name, pages) header = [ "0 HEAD", "1 SOUR BaseCGL", f"2 NAME {SOURCE_STR}", "1 GEDC", "2 VERS 5.5.1", "2 FORM LINEAGE-LINKED", "1 CHAR UTF-8", f"1 NOTE Filiations {family_name} – export automatique depuis les pages Drupal", ] trailer = ["0 TRLR"] return "\n".join(header + body_lines + trailer) + "\n" # ── Main ────────────────────────────────────────────────────────────────────── def main(): OUT_DIR.mkdir(exist_ok=True) print(f"Streaming {SQL_FILE} …", flush=True) nodes = stream_filiation_nodes(SQL_FILE) print(f" Found {len(nodes)} filiation book pages") families = group_by_family(nodes) print(f" Found {len(families)} families: {', '.join(sorted(families))}") total_persons = 0 for fname in sorted(families): pages = families[fname] gen_pages = [p for p in pages if p["gen"] > 0] root_pages = [p for p in pages if p["gen"] == 0] all_pages = root_pages + gen_pages print(f"\n{fname}: {len(all_pages)} pages ({len(gen_pages)} generations)") gedcom = build_gedcom_file(fname, all_pages) out_path = OUT_DIR / f"filiations_{fname.replace(' ', '_').replace(chr(39), '')}.ged" out_path.write_text(gedcom, encoding="utf-8") # Count INDI / FAM records by their level-0 xref prefix n_indi = gedcom.count("0 @I") n_fam = gedcom.count("0 @F") total_persons += n_indi print(f" → {out_path.name} ({n_indi} INDI, {n_fam} FAM, {len(gedcom):,} bytes)") print(f"\nDone. Total individuals across all families: {total_persons}") print(f"Output directory: {OUT_DIR}") if __name__ == "__main__": main()