#!/usr/bin/env python3 """ Drupal 6 → Webtrees 2 user migration. Streams basesgen.sql (without importing it) and writes: webtrees_users_import.sql — SQL to run against the Webtrees database webtrees_temp_passwords.csv — temporary passwords to distribute to users Usage: python3 export_users_to_webtrees.py Requires: Python ≥ 3.8, bcrypt package (pip install bcrypt) """ import csv import re import secrets import string import sys from pathlib import Path # ── Configuration ───────────────────────────────────────────────────────────── SQL_FILE = Path("/home/yann64/BaseCGL/basesgen.sql") OUT_SQL = Path("/home/yann64/BaseCGL/webtrees_users_import.sql") OUT_CSV = Path("/home/yann64/BaseCGL/webtrees_temp_passwords.csv") PREFIX = "_wt" # Webtrees table prefix TREE_ID = 1 # Target tree gedcom_id # Drupal role id → Webtrees canedit level. # rid=5 appears in the data but not in drupal_role (deleted role); treated as 'access'. ROLE_CANEDIT = { 4: "admin", # Administrateur 3: "access", # Membre du cercle } DEFAULT_CANEDIT = "access" # all active authenticated users get at least read access DRUPAL_ADMIN_UID = 1 # uid that also receives canadmin=1 (site admin) TABLES = {"drupal_users", "drupal_profile_values", "drupal_users_roles"} # ── SQL value parser ────────────────────────────────────────────────────────── def parse_values(line: str) -> list: """ Parse one SQL VALUES row such as `(1, 'foo', NULL, 42)` into a Python list. Returns strings for quoted/numeric values and None for NULL. """ s = line.strip() if s.startswith("("): s = s[1:] for suffix in (");", "),", ")"): if s.endswith(suffix): s = s[: -len(suffix)] break ESCAPES = { "n": "\n", "r": "\r", "t": "\t", "b": "\x08", "\\": "\\", "'": "'", '"': '"', "0": "\x00", "Z": "\x1a", } values = [] i, n = 0, len(s) while i < n: while i < n and s[i] in " \t": i += 1 if i >= n: break if s[i : i + 4] == "NULL": values.append(None) i += 4 elif s[i] == "'": i += 1 buf = [] while i < n: c = s[i] if c == "\\" and i + 1 < n: buf.append(ESCAPES.get(s[i + 1], s[i + 1])) i += 2 elif c == "'": i += 1 break else: buf.append(c) i += 1 values.append("".join(buf)) else: j = i while j < n and s[j] != ",": j += 1 values.append(s[i:j].strip()) i = j while i < n and s[i] in " \t,": i += 1 return values # ── SQL dump streaming ──────────────────────────────────────────────────────── INSERT_RE = re.compile( r"INSERT INTO `([^`]+)` \((.+)\) VALUES", re.IGNORECASE, ) def stream_tables(filepath: Path, targets: set) -> dict: """ Single-pass stream of the SQL dump. Returns {table_name: {"cols": [...], "rows": [[...], ...]}} for each target table. Handles multiple INSERT batches for the same table (phpMyAdmin splits large tables). """ result = {t: {"cols": [], "rows": []} for t in targets} current = None with open(filepath, encoding="utf-8", errors="replace") as fh: for line in fh: ls = line.rstrip("\r\n") m = INSERT_RE.match(ls) if m: tname = m.group(1) if tname in targets: current = tname cols = [c.strip().strip("`") for c in m.group(2).split(",")] # Only set cols once (all batches share the same column order) if not result[current]["cols"]: result[current]["cols"] = cols else: current = None continue if current is None: continue stripped = ls.strip() if stripped.startswith("("): result[current]["rows"].append(parse_values(stripped)) if stripped.endswith(";"): current = None return result def as_dicts(table: dict) -> list[dict]: cols = table["cols"] return [dict(zip(cols, row)) for row in table["rows"]] # ── Data assembly ───────────────────────────────────────────────────────────── def build_users(tables: dict) -> tuple[list[dict], list[tuple]]: """ Join drupal_users + profile_values + users_roles. Returns (users_to_migrate, skipped_list). """ # Profile values: uid → {fid → value} (fid 2 = nom, fid 3 = prénom) profiles: dict[int, dict[int, str]] = {} for pv in as_dicts(tables["drupal_profile_values"]): try: fid = int(pv["fid"]) uid = int(pv["uid"]) except (TypeError, ValueError): continue if fid in (2, 3) and pv.get("value"): profiles.setdefault(uid, {})[fid] = pv["value"].strip() # Roles: uid → best canedit level ('admin' beats 'access') uid_canedit: dict[int, str] = {} for ur in as_dicts(tables["drupal_users_roles"]): try: uid = int(ur["uid"]) rid = int(ur["rid"]) except (TypeError, ValueError): continue if uid == 0: continue level = ROLE_CANEDIT.get(rid) if level == "admin" or (level and uid not in uid_canedit): uid_canedit[uid] = level # Build final user list seen_emails: dict[str, int] = {} users: list[dict] = [] skipped: list[tuple] = [] for u in as_dicts(tables["drupal_users"]): try: uid = int(u["uid"]) except (TypeError, ValueError): continue if uid == 0: continue # anonymous placeholder row status = int(u.get("status") or 0) if status == 0: skipped.append((uid, u.get("name"), "blocked")) continue mail = (u.get("mail") or "").strip() if not mail: skipped.append((uid, u.get("name"), "no email")) continue mail_lc = mail.lower() if mail_lc in seen_emails: skipped.append((uid, u.get("name"), f"duplicate email (uid {seen_emails[mail_lc]} kept)")) continue seen_emails[mail_lc] = uid # real_name: prefer "prénom nom" from profile, fall back to login name p = profiles.get(uid, {}) nom = p.get(2, "") prenom = p.get(3, "") if prenom and nom: real_name = f"{prenom} {nom}" elif nom: real_name = nom elif prenom: real_name = prenom else: real_name = u.get("name", "") real_name = real_name[:64] # VARCHAR(64) user_name = (u.get("name") or "").strip()[:32] # VARCHAR(32) language = (u.get("language") or "fr").strip() or "fr" created = str(u.get("created") or "0") canedit = uid_canedit.get(uid, DEFAULT_CANEDIT) canadmin = uid == DRUPAL_ADMIN_UID users.append({ "user_id": uid, "user_name": user_name, "real_name": real_name, "email": mail, "language": language, "created": created, "canedit": canedit, "canadmin": canadmin, }) users.sort(key=lambda u: u["user_id"]) return users, skipped # ── Password generation ─────────────────────────────────────────────────────── # Unambiguous character set (no l/1/0/O/I) _ALPHABET = ( string.ascii_uppercase.replace("I", "").replace("O", "") + string.ascii_lowercase.replace("l", "") + string.digits.replace("0", "").replace("1", "") ) def gen_password(length: int = 16) -> str: return "".join(secrets.choice(_ALPHABET) for _ in range(length)) def hash_bcrypt(password: str) -> str: """Return a PHP-compatible bcrypt hash ($2y$ prefix, cost 12).""" import bcrypt h = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt(rounds=12)) # Python bcrypt emits $2b$; replace with $2y$ (both accepted by PHP password_verify) return h.decode("ascii").replace("$2b$", "$2y$", 1) # ── SQL rendering ───────────────────────────────────────────────────────────── def sql_str(v) -> str: if v is None: return "NULL" return "'" + str(v).replace("\\", "\\\\").replace("'", "\\'") + "'" def render_sql(users: list[dict], pw_hashes: dict[int, str]) -> str: lines = [ "-- ============================================================", "-- Webtrees user import — generated from basesgen.sql", f"-- Table prefix : {PREFIX}", f"-- Tree ID : {TREE_ID}", f"-- Users : {len(users)}", "--", "-- All imported users have a random temporary password.", "-- Distribute webtrees_temp_passwords.csv and ask users to", "-- change their password on first login.", "--", "-- Prerequisites: the Webtrees tree with gedcom_id=1 must", "-- already exist before running this script.", "-- ============================================================", "", "SET NAMES utf8mb4;", "SET foreign_key_checks = 0;", "", ] # ── _wt_user ───────────────────────────────────────────────────────────── lines.append(f"INSERT INTO `{PREFIX}_user`") lines.append(" (`user_id`, `user_name`, `real_name`, `email`, `password`)") lines.append("VALUES") user_rows = [ f" ({u['user_id']}, {sql_str(u['user_name'])}, " f"{sql_str(u['real_name'])}, {sql_str(u['email'])}, " f"{sql_str(pw_hashes[u['user_id']])})" for u in users ] lines.append(",\n".join(user_rows) + ";") lines.append("") # ── _wt_user_setting ────────────────────────────────────────────────────── setting_rows = [] for u in users: uid = u["user_id"] for name, value in [ ("verified", "1"), ("verified_by_admin", "1"), ("contactmethod", "messaging"), ("language", u["language"]), ("timezone", "Europe/Paris"), ("reg_timestamp", u["created"]), ]: setting_rows.append( f" ({uid}, {sql_str(name)}, {sql_str(value)})" ) if u["canadmin"]: setting_rows.append(f" ({uid}, 'canadmin', '1')") lines.append(f"INSERT INTO `{PREFIX}_user_setting`") lines.append(" (`user_id`, `setting_name`, `setting_value`)") lines.append("VALUES") lines.append(",\n".join(setting_rows) + ";") lines.append("") # ── _wt_user_gedcom_setting ─────────────────────────────────────────────── gedcom_rows = [ f" ({u['user_id']}, {TREE_ID}, 'canedit', {sql_str(u['canedit'])})" for u in users ] lines.append(f"INSERT INTO `{PREFIX}_user_gedcom_setting`") lines.append(" (`user_id`, `gedcom_id`, `setting_name`, `setting_value`)") lines.append("VALUES") lines.append(",\n".join(gedcom_rows) + ";") lines.append("") # Reset AUTO_INCREMENT past the highest imported uid max_uid = max(u["user_id"] for u in users) lines.append(f"ALTER TABLE `{PREFIX}_user` AUTO_INCREMENT = {max_uid + 1};") lines.append("") lines.append("SET foreign_key_checks = 1;") lines.append("") return "\n".join(lines) # ── Main ────────────────────────────────────────────────────────────────────── def main(): print(f"Streaming {SQL_FILE} …", flush=True) tables = stream_tables(SQL_FILE, TABLES) for tname, tdata in tables.items(): print(f" {tname}: {len(tdata['rows'])} rows") users, skipped = build_users(tables) print(f"\nActive users to migrate : {len(users)}") print(f"Skipped : {len(skipped)}") for uid, name, reason in skipped: print(f" SKIP uid={uid:5d} {(name or ''):<20s} {reason}") # Role summary from collections import Counter role_counts = Counter(u["canedit"] for u in users) admin_count = sum(1 for u in users if u["canadmin"]) print(f"\nTree access — admin: {role_counts['admin']} access: {role_counts['access']} none: {role_counts.get('none', 0)}") print(f"Site admin — {admin_count} user(s)") print(f"\nHashing {len(users)} passwords (bcrypt cost=12) …", flush=True) plain_pws: dict[int, str] = {} pw_hashes: dict[int, str] = {} for i, u in enumerate(users, 1): pw = gen_password() plain_pws[u["user_id"]] = pw pw_hashes[u["user_id"]] = hash_bcrypt(pw) print(f" [{i:3d}/{len(users)}] uid={u['user_id']:5d} {u['user_name']}", end="\r", flush=True) print() print(f"\nWriting {OUT_SQL} …") OUT_SQL.write_text(render_sql(users, pw_hashes), encoding="utf-8") print(f"Writing {OUT_CSV} …") with OUT_CSV.open("w", newline="", encoding="utf-8") as cf: w = csv.writer(cf) w.writerow(["user_id", "user_name", "real_name", "email", "temp_password"]) for u in users: w.writerow([ u["user_id"], u["user_name"], u["real_name"], u["email"], plain_pws[u["user_id"]], ]) print("\nDone.") print(f" {OUT_SQL.name} ({OUT_SQL.stat().st_size:,} bytes)") print(f" {OUT_CSV.name} ({OUT_CSV.stat().st_size:,} bytes)") if __name__ == "__main__": main()