Initial commit: GEDCOM export scripts and generated filiations

Includes export_lignees_to_gedcom.py (Drupal book → GEDCOM 5.5.1),
export_users_to_webtrees.py, generated GEDCOM files for 16 family
lineages, and webtrees user import SQL. Excludes basesgen.sql (966 MB)
and webtrees_temp_passwords.csv (sensitive).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-28 13:44:28 +02:00
commit f75cbebb44
21 changed files with 54458 additions and 0 deletions
+412
View File
@@ -0,0 +1,412 @@
#!/usr/bin/env python3
"""
Drupal 6 → Webtrees 2 user migration.
Streams basesgen.sql (without importing it) and writes:
webtrees_users_import.sql — SQL to run against the Webtrees database
webtrees_temp_passwords.csv — temporary passwords to distribute to users
Usage:
python3 export_users_to_webtrees.py
Requires: Python ≥ 3.8, bcrypt package (pip install bcrypt)
"""
import csv
import re
import secrets
import string
import sys
from pathlib import Path
# ── Configuration ─────────────────────────────────────────────────────────────
SQL_FILE = Path("/home/yann64/BaseCGL/basesgen.sql")
OUT_SQL = Path("/home/yann64/BaseCGL/webtrees_users_import.sql")
OUT_CSV = Path("/home/yann64/BaseCGL/webtrees_temp_passwords.csv")
PREFIX = "_wt" # Webtrees table prefix
TREE_ID = 1 # Target tree gedcom_id
# Drupal role id → Webtrees canedit level.
# rid=5 appears in the data but not in drupal_role (deleted role); treated as 'access'.
ROLE_CANEDIT = {
4: "admin", # Administrateur
3: "access", # Membre du cercle
}
DEFAULT_CANEDIT = "access" # all active authenticated users get at least read access
DRUPAL_ADMIN_UID = 1 # uid that also receives canadmin=1 (site admin)
TABLES = {"drupal_users", "drupal_profile_values", "drupal_users_roles"}
# ── SQL value parser ──────────────────────────────────────────────────────────
def parse_values(line: str) -> list:
"""
Parse one SQL VALUES row such as `(1, 'foo', NULL, 42)` into a Python list.
Returns strings for quoted/numeric values and None for NULL.
"""
s = line.strip()
if s.startswith("("):
s = s[1:]
for suffix in (");", "),", ")"):
if s.endswith(suffix):
s = s[: -len(suffix)]
break
ESCAPES = {
"n": "\n", "r": "\r", "t": "\t", "b": "\x08",
"\\": "\\", "'": "'", '"': '"', "0": "\x00", "Z": "\x1a",
}
values = []
i, n = 0, len(s)
while i < n:
while i < n and s[i] in " \t":
i += 1
if i >= n:
break
if s[i : i + 4] == "NULL":
values.append(None)
i += 4
elif s[i] == "'":
i += 1
buf = []
while i < n:
c = s[i]
if c == "\\" and i + 1 < n:
buf.append(ESCAPES.get(s[i + 1], s[i + 1]))
i += 2
elif c == "'":
i += 1
break
else:
buf.append(c)
i += 1
values.append("".join(buf))
else:
j = i
while j < n and s[j] != ",":
j += 1
values.append(s[i:j].strip())
i = j
while i < n and s[i] in " \t,":
i += 1
return values
# ── SQL dump streaming ────────────────────────────────────────────────────────
INSERT_RE = re.compile(
r"INSERT INTO `([^`]+)` \((.+)\) VALUES",
re.IGNORECASE,
)
def stream_tables(filepath: Path, targets: set) -> dict:
"""
Single-pass stream of the SQL dump.
Returns {table_name: {"cols": [...], "rows": [[...], ...]}} for each target table.
Handles multiple INSERT batches for the same table (phpMyAdmin splits large tables).
"""
result = {t: {"cols": [], "rows": []} for t in targets}
current = None
with open(filepath, encoding="utf-8", errors="replace") as fh:
for line in fh:
ls = line.rstrip("\r\n")
m = INSERT_RE.match(ls)
if m:
tname = m.group(1)
if tname in targets:
current = tname
cols = [c.strip().strip("`") for c in m.group(2).split(",")]
# Only set cols once (all batches share the same column order)
if not result[current]["cols"]:
result[current]["cols"] = cols
else:
current = None
continue
if current is None:
continue
stripped = ls.strip()
if stripped.startswith("("):
result[current]["rows"].append(parse_values(stripped))
if stripped.endswith(";"):
current = None
return result
def as_dicts(table: dict) -> list[dict]:
cols = table["cols"]
return [dict(zip(cols, row)) for row in table["rows"]]
# ── Data assembly ─────────────────────────────────────────────────────────────
def build_users(tables: dict) -> tuple[list[dict], list[tuple]]:
"""
Join drupal_users + profile_values + users_roles.
Returns (users_to_migrate, skipped_list).
"""
# Profile values: uid → {fid → value} (fid 2 = nom, fid 3 = prénom)
profiles: dict[int, dict[int, str]] = {}
for pv in as_dicts(tables["drupal_profile_values"]):
try:
fid = int(pv["fid"])
uid = int(pv["uid"])
except (TypeError, ValueError):
continue
if fid in (2, 3) and pv.get("value"):
profiles.setdefault(uid, {})[fid] = pv["value"].strip()
# Roles: uid → best canedit level ('admin' beats 'access')
uid_canedit: dict[int, str] = {}
for ur in as_dicts(tables["drupal_users_roles"]):
try:
uid = int(ur["uid"])
rid = int(ur["rid"])
except (TypeError, ValueError):
continue
if uid == 0:
continue
level = ROLE_CANEDIT.get(rid)
if level == "admin" or (level and uid not in uid_canedit):
uid_canedit[uid] = level
# Build final user list
seen_emails: dict[str, int] = {}
users: list[dict] = []
skipped: list[tuple] = []
for u in as_dicts(tables["drupal_users"]):
try:
uid = int(u["uid"])
except (TypeError, ValueError):
continue
if uid == 0:
continue # anonymous placeholder row
status = int(u.get("status") or 0)
if status == 0:
skipped.append((uid, u.get("name"), "blocked"))
continue
mail = (u.get("mail") or "").strip()
if not mail:
skipped.append((uid, u.get("name"), "no email"))
continue
mail_lc = mail.lower()
if mail_lc in seen_emails:
skipped.append((uid, u.get("name"), f"duplicate email (uid {seen_emails[mail_lc]} kept)"))
continue
seen_emails[mail_lc] = uid
# real_name: prefer "prénom nom" from profile, fall back to login name
p = profiles.get(uid, {})
nom = p.get(2, "")
prenom = p.get(3, "")
if prenom and nom:
real_name = f"{prenom} {nom}"
elif nom:
real_name = nom
elif prenom:
real_name = prenom
else:
real_name = u.get("name", "")
real_name = real_name[:64] # VARCHAR(64)
user_name = (u.get("name") or "").strip()[:32] # VARCHAR(32)
language = (u.get("language") or "fr").strip() or "fr"
created = str(u.get("created") or "0")
canedit = uid_canedit.get(uid, DEFAULT_CANEDIT)
canadmin = uid == DRUPAL_ADMIN_UID
users.append({
"user_id": uid,
"user_name": user_name,
"real_name": real_name,
"email": mail,
"language": language,
"created": created,
"canedit": canedit,
"canadmin": canadmin,
})
users.sort(key=lambda u: u["user_id"])
return users, skipped
# ── Password generation ───────────────────────────────────────────────────────
# Unambiguous character set (no l/1/0/O/I)
_ALPHABET = (
string.ascii_uppercase.replace("I", "").replace("O", "")
+ string.ascii_lowercase.replace("l", "")
+ string.digits.replace("0", "").replace("1", "")
)
def gen_password(length: int = 16) -> str:
return "".join(secrets.choice(_ALPHABET) for _ in range(length))
def hash_bcrypt(password: str) -> str:
"""Return a PHP-compatible bcrypt hash ($2y$ prefix, cost 12)."""
import bcrypt
h = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt(rounds=12))
# Python bcrypt emits $2b$; replace with $2y$ (both accepted by PHP password_verify)
return h.decode("ascii").replace("$2b$", "$2y$", 1)
# ── SQL rendering ─────────────────────────────────────────────────────────────
def sql_str(v) -> str:
if v is None:
return "NULL"
return "'" + str(v).replace("\\", "\\\\").replace("'", "\\'") + "'"
def render_sql(users: list[dict], pw_hashes: dict[int, str]) -> str:
lines = [
"-- ============================================================",
"-- Webtrees user import — generated from basesgen.sql",
f"-- Table prefix : {PREFIX}",
f"-- Tree ID : {TREE_ID}",
f"-- Users : {len(users)}",
"--",
"-- All imported users have a random temporary password.",
"-- Distribute webtrees_temp_passwords.csv and ask users to",
"-- change their password on first login.",
"--",
"-- Prerequisites: the Webtrees tree with gedcom_id=1 must",
"-- already exist before running this script.",
"-- ============================================================",
"",
"SET NAMES utf8mb4;",
"SET foreign_key_checks = 0;",
"",
]
# ── _wt_user ─────────────────────────────────────────────────────────────
lines.append(f"INSERT INTO `{PREFIX}_user`")
lines.append(" (`user_id`, `user_name`, `real_name`, `email`, `password`)")
lines.append("VALUES")
user_rows = [
f" ({u['user_id']}, {sql_str(u['user_name'])}, "
f"{sql_str(u['real_name'])}, {sql_str(u['email'])}, "
f"{sql_str(pw_hashes[u['user_id']])})"
for u in users
]
lines.append(",\n".join(user_rows) + ";")
lines.append("")
# ── _wt_user_setting ──────────────────────────────────────────────────────
setting_rows = []
for u in users:
uid = u["user_id"]
for name, value in [
("verified", "1"),
("verified_by_admin", "1"),
("contactmethod", "messaging"),
("language", u["language"]),
("timezone", "Europe/Paris"),
("reg_timestamp", u["created"]),
]:
setting_rows.append(
f" ({uid}, {sql_str(name)}, {sql_str(value)})"
)
if u["canadmin"]:
setting_rows.append(f" ({uid}, 'canadmin', '1')")
lines.append(f"INSERT INTO `{PREFIX}_user_setting`")
lines.append(" (`user_id`, `setting_name`, `setting_value`)")
lines.append("VALUES")
lines.append(",\n".join(setting_rows) + ";")
lines.append("")
# ── _wt_user_gedcom_setting ───────────────────────────────────────────────
gedcom_rows = [
f" ({u['user_id']}, {TREE_ID}, 'canedit', {sql_str(u['canedit'])})"
for u in users
]
lines.append(f"INSERT INTO `{PREFIX}_user_gedcom_setting`")
lines.append(" (`user_id`, `gedcom_id`, `setting_name`, `setting_value`)")
lines.append("VALUES")
lines.append(",\n".join(gedcom_rows) + ";")
lines.append("")
# Reset AUTO_INCREMENT past the highest imported uid
max_uid = max(u["user_id"] for u in users)
lines.append(f"ALTER TABLE `{PREFIX}_user` AUTO_INCREMENT = {max_uid + 1};")
lines.append("")
lines.append("SET foreign_key_checks = 1;")
lines.append("")
return "\n".join(lines)
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
print(f"Streaming {SQL_FILE}", flush=True)
tables = stream_tables(SQL_FILE, TABLES)
for tname, tdata in tables.items():
print(f" {tname}: {len(tdata['rows'])} rows")
users, skipped = build_users(tables)
print(f"\nActive users to migrate : {len(users)}")
print(f"Skipped : {len(skipped)}")
for uid, name, reason in skipped:
print(f" SKIP uid={uid:5d} {(name or ''):<20s} {reason}")
# Role summary
from collections import Counter
role_counts = Counter(u["canedit"] for u in users)
admin_count = sum(1 for u in users if u["canadmin"])
print(f"\nTree access — admin: {role_counts['admin']} access: {role_counts['access']} none: {role_counts.get('none', 0)}")
print(f"Site admin — {admin_count} user(s)")
print(f"\nHashing {len(users)} passwords (bcrypt cost=12) …", flush=True)
plain_pws: dict[int, str] = {}
pw_hashes: dict[int, str] = {}
for i, u in enumerate(users, 1):
pw = gen_password()
plain_pws[u["user_id"]] = pw
pw_hashes[u["user_id"]] = hash_bcrypt(pw)
print(f" [{i:3d}/{len(users)}] uid={u['user_id']:5d} {u['user_name']}", end="\r", flush=True)
print()
print(f"\nWriting {OUT_SQL}")
OUT_SQL.write_text(render_sql(users, pw_hashes), encoding="utf-8")
print(f"Writing {OUT_CSV}")
with OUT_CSV.open("w", newline="", encoding="utf-8") as cf:
w = csv.writer(cf)
w.writerow(["user_id", "user_name", "real_name", "email", "temp_password"])
for u in users:
w.writerow([
u["user_id"], u["user_name"], u["real_name"],
u["email"], plain_pws[u["user_id"]],
])
print("\nDone.")
print(f" {OUT_SQL.name} ({OUT_SQL.stat().st_size:,} bytes)")
print(f" {OUT_CSV.name} ({OUT_CSV.stat().st_size:,} bytes)")
if __name__ == "__main__":
main()