f75cbebb44
Includes export_lignees_to_gedcom.py (Drupal book → GEDCOM 5.5.1), export_users_to_webtrees.py, generated GEDCOM files for 16 family lineages, and webtrees user import SQL. Excludes basesgen.sql (966 MB) and webtrees_temp_passwords.csv (sensitive). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
413 lines
14 KiB
Python
413 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Drupal 6 → Webtrees 2 user migration.
|
|
|
|
Streams basesgen.sql (without importing it) and writes:
|
|
webtrees_users_import.sql — SQL to run against the Webtrees database
|
|
webtrees_temp_passwords.csv — temporary passwords to distribute to users
|
|
|
|
Usage:
|
|
python3 export_users_to_webtrees.py
|
|
|
|
Requires: Python ≥ 3.8, bcrypt package (pip install bcrypt)
|
|
"""
|
|
|
|
import csv
|
|
import re
|
|
import secrets
|
|
import string
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# ── Configuration ─────────────────────────────────────────────────────────────
|
|
|
|
SQL_FILE = Path("/home/yann64/BaseCGL/basesgen.sql")
|
|
OUT_SQL = Path("/home/yann64/BaseCGL/webtrees_users_import.sql")
|
|
OUT_CSV = Path("/home/yann64/BaseCGL/webtrees_temp_passwords.csv")
|
|
|
|
PREFIX = "_wt" # Webtrees table prefix
|
|
TREE_ID = 1 # Target tree gedcom_id
|
|
|
|
# Drupal role id → Webtrees canedit level.
|
|
# rid=5 appears in the data but not in drupal_role (deleted role); treated as 'access'.
|
|
ROLE_CANEDIT = {
|
|
4: "admin", # Administrateur
|
|
3: "access", # Membre du cercle
|
|
}
|
|
DEFAULT_CANEDIT = "access" # all active authenticated users get at least read access
|
|
|
|
DRUPAL_ADMIN_UID = 1 # uid that also receives canadmin=1 (site admin)
|
|
|
|
TABLES = {"drupal_users", "drupal_profile_values", "drupal_users_roles"}
|
|
|
|
# ── SQL value parser ──────────────────────────────────────────────────────────
|
|
|
|
def parse_values(line: str) -> list:
|
|
"""
|
|
Parse one SQL VALUES row such as `(1, 'foo', NULL, 42)` into a Python list.
|
|
Returns strings for quoted/numeric values and None for NULL.
|
|
"""
|
|
s = line.strip()
|
|
if s.startswith("("):
|
|
s = s[1:]
|
|
for suffix in (");", "),", ")"):
|
|
if s.endswith(suffix):
|
|
s = s[: -len(suffix)]
|
|
break
|
|
|
|
ESCAPES = {
|
|
"n": "\n", "r": "\r", "t": "\t", "b": "\x08",
|
|
"\\": "\\", "'": "'", '"': '"', "0": "\x00", "Z": "\x1a",
|
|
}
|
|
|
|
values = []
|
|
i, n = 0, len(s)
|
|
|
|
while i < n:
|
|
while i < n and s[i] in " \t":
|
|
i += 1
|
|
if i >= n:
|
|
break
|
|
|
|
if s[i : i + 4] == "NULL":
|
|
values.append(None)
|
|
i += 4
|
|
elif s[i] == "'":
|
|
i += 1
|
|
buf = []
|
|
while i < n:
|
|
c = s[i]
|
|
if c == "\\" and i + 1 < n:
|
|
buf.append(ESCAPES.get(s[i + 1], s[i + 1]))
|
|
i += 2
|
|
elif c == "'":
|
|
i += 1
|
|
break
|
|
else:
|
|
buf.append(c)
|
|
i += 1
|
|
values.append("".join(buf))
|
|
else:
|
|
j = i
|
|
while j < n and s[j] != ",":
|
|
j += 1
|
|
values.append(s[i:j].strip())
|
|
i = j
|
|
|
|
while i < n and s[i] in " \t,":
|
|
i += 1
|
|
|
|
return values
|
|
|
|
|
|
# ── SQL dump streaming ────────────────────────────────────────────────────────
|
|
|
|
INSERT_RE = re.compile(
|
|
r"INSERT INTO `([^`]+)` \((.+)\) VALUES",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def stream_tables(filepath: Path, targets: set) -> dict:
|
|
"""
|
|
Single-pass stream of the SQL dump.
|
|
Returns {table_name: {"cols": [...], "rows": [[...], ...]}} for each target table.
|
|
Handles multiple INSERT batches for the same table (phpMyAdmin splits large tables).
|
|
"""
|
|
result = {t: {"cols": [], "rows": []} for t in targets}
|
|
current = None
|
|
|
|
with open(filepath, encoding="utf-8", errors="replace") as fh:
|
|
for line in fh:
|
|
ls = line.rstrip("\r\n")
|
|
|
|
m = INSERT_RE.match(ls)
|
|
if m:
|
|
tname = m.group(1)
|
|
if tname in targets:
|
|
current = tname
|
|
cols = [c.strip().strip("`") for c in m.group(2).split(",")]
|
|
# Only set cols once (all batches share the same column order)
|
|
if not result[current]["cols"]:
|
|
result[current]["cols"] = cols
|
|
else:
|
|
current = None
|
|
continue
|
|
|
|
if current is None:
|
|
continue
|
|
|
|
stripped = ls.strip()
|
|
if stripped.startswith("("):
|
|
result[current]["rows"].append(parse_values(stripped))
|
|
|
|
if stripped.endswith(";"):
|
|
current = None
|
|
|
|
return result
|
|
|
|
|
|
def as_dicts(table: dict) -> list[dict]:
|
|
cols = table["cols"]
|
|
return [dict(zip(cols, row)) for row in table["rows"]]
|
|
|
|
|
|
# ── Data assembly ─────────────────────────────────────────────────────────────
|
|
|
|
def build_users(tables: dict) -> tuple[list[dict], list[tuple]]:
|
|
"""
|
|
Join drupal_users + profile_values + users_roles.
|
|
Returns (users_to_migrate, skipped_list).
|
|
"""
|
|
# Profile values: uid → {fid → value} (fid 2 = nom, fid 3 = prénom)
|
|
profiles: dict[int, dict[int, str]] = {}
|
|
for pv in as_dicts(tables["drupal_profile_values"]):
|
|
try:
|
|
fid = int(pv["fid"])
|
|
uid = int(pv["uid"])
|
|
except (TypeError, ValueError):
|
|
continue
|
|
if fid in (2, 3) and pv.get("value"):
|
|
profiles.setdefault(uid, {})[fid] = pv["value"].strip()
|
|
|
|
# Roles: uid → best canedit level ('admin' beats 'access')
|
|
uid_canedit: dict[int, str] = {}
|
|
for ur in as_dicts(tables["drupal_users_roles"]):
|
|
try:
|
|
uid = int(ur["uid"])
|
|
rid = int(ur["rid"])
|
|
except (TypeError, ValueError):
|
|
continue
|
|
if uid == 0:
|
|
continue
|
|
level = ROLE_CANEDIT.get(rid)
|
|
if level == "admin" or (level and uid not in uid_canedit):
|
|
uid_canedit[uid] = level
|
|
|
|
# Build final user list
|
|
seen_emails: dict[str, int] = {}
|
|
users: list[dict] = []
|
|
skipped: list[tuple] = []
|
|
|
|
for u in as_dicts(tables["drupal_users"]):
|
|
try:
|
|
uid = int(u["uid"])
|
|
except (TypeError, ValueError):
|
|
continue
|
|
|
|
if uid == 0:
|
|
continue # anonymous placeholder row
|
|
|
|
status = int(u.get("status") or 0)
|
|
if status == 0:
|
|
skipped.append((uid, u.get("name"), "blocked"))
|
|
continue
|
|
|
|
mail = (u.get("mail") or "").strip()
|
|
if not mail:
|
|
skipped.append((uid, u.get("name"), "no email"))
|
|
continue
|
|
|
|
mail_lc = mail.lower()
|
|
if mail_lc in seen_emails:
|
|
skipped.append((uid, u.get("name"), f"duplicate email (uid {seen_emails[mail_lc]} kept)"))
|
|
continue
|
|
seen_emails[mail_lc] = uid
|
|
|
|
# real_name: prefer "prénom nom" from profile, fall back to login name
|
|
p = profiles.get(uid, {})
|
|
nom = p.get(2, "")
|
|
prenom = p.get(3, "")
|
|
if prenom and nom:
|
|
real_name = f"{prenom} {nom}"
|
|
elif nom:
|
|
real_name = nom
|
|
elif prenom:
|
|
real_name = prenom
|
|
else:
|
|
real_name = u.get("name", "")
|
|
real_name = real_name[:64] # VARCHAR(64)
|
|
|
|
user_name = (u.get("name") or "").strip()[:32] # VARCHAR(32)
|
|
language = (u.get("language") or "fr").strip() or "fr"
|
|
created = str(u.get("created") or "0")
|
|
canedit = uid_canedit.get(uid, DEFAULT_CANEDIT)
|
|
canadmin = uid == DRUPAL_ADMIN_UID
|
|
|
|
users.append({
|
|
"user_id": uid,
|
|
"user_name": user_name,
|
|
"real_name": real_name,
|
|
"email": mail,
|
|
"language": language,
|
|
"created": created,
|
|
"canedit": canedit,
|
|
"canadmin": canadmin,
|
|
})
|
|
|
|
users.sort(key=lambda u: u["user_id"])
|
|
return users, skipped
|
|
|
|
|
|
# ── Password generation ───────────────────────────────────────────────────────
|
|
|
|
# Unambiguous character set (no l/1/0/O/I)
|
|
_ALPHABET = (
|
|
string.ascii_uppercase.replace("I", "").replace("O", "")
|
|
+ string.ascii_lowercase.replace("l", "")
|
|
+ string.digits.replace("0", "").replace("1", "")
|
|
)
|
|
|
|
|
|
def gen_password(length: int = 16) -> str:
|
|
return "".join(secrets.choice(_ALPHABET) for _ in range(length))
|
|
|
|
|
|
def hash_bcrypt(password: str) -> str:
|
|
"""Return a PHP-compatible bcrypt hash ($2y$ prefix, cost 12)."""
|
|
import bcrypt
|
|
h = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt(rounds=12))
|
|
# Python bcrypt emits $2b$; replace with $2y$ (both accepted by PHP password_verify)
|
|
return h.decode("ascii").replace("$2b$", "$2y$", 1)
|
|
|
|
|
|
# ── SQL rendering ─────────────────────────────────────────────────────────────
|
|
|
|
def sql_str(v) -> str:
|
|
if v is None:
|
|
return "NULL"
|
|
return "'" + str(v).replace("\\", "\\\\").replace("'", "\\'") + "'"
|
|
|
|
|
|
def render_sql(users: list[dict], pw_hashes: dict[int, str]) -> str:
|
|
lines = [
|
|
"-- ============================================================",
|
|
"-- Webtrees user import — generated from basesgen.sql",
|
|
f"-- Table prefix : {PREFIX}",
|
|
f"-- Tree ID : {TREE_ID}",
|
|
f"-- Users : {len(users)}",
|
|
"--",
|
|
"-- All imported users have a random temporary password.",
|
|
"-- Distribute webtrees_temp_passwords.csv and ask users to",
|
|
"-- change their password on first login.",
|
|
"--",
|
|
"-- Prerequisites: the Webtrees tree with gedcom_id=1 must",
|
|
"-- already exist before running this script.",
|
|
"-- ============================================================",
|
|
"",
|
|
"SET NAMES utf8mb4;",
|
|
"SET foreign_key_checks = 0;",
|
|
"",
|
|
]
|
|
|
|
# ── _wt_user ─────────────────────────────────────────────────────────────
|
|
lines.append(f"INSERT INTO `{PREFIX}_user`")
|
|
lines.append(" (`user_id`, `user_name`, `real_name`, `email`, `password`)")
|
|
lines.append("VALUES")
|
|
user_rows = [
|
|
f" ({u['user_id']}, {sql_str(u['user_name'])}, "
|
|
f"{sql_str(u['real_name'])}, {sql_str(u['email'])}, "
|
|
f"{sql_str(pw_hashes[u['user_id']])})"
|
|
for u in users
|
|
]
|
|
lines.append(",\n".join(user_rows) + ";")
|
|
lines.append("")
|
|
|
|
# ── _wt_user_setting ──────────────────────────────────────────────────────
|
|
setting_rows = []
|
|
for u in users:
|
|
uid = u["user_id"]
|
|
for name, value in [
|
|
("verified", "1"),
|
|
("verified_by_admin", "1"),
|
|
("contactmethod", "messaging"),
|
|
("language", u["language"]),
|
|
("timezone", "Europe/Paris"),
|
|
("reg_timestamp", u["created"]),
|
|
]:
|
|
setting_rows.append(
|
|
f" ({uid}, {sql_str(name)}, {sql_str(value)})"
|
|
)
|
|
if u["canadmin"]:
|
|
setting_rows.append(f" ({uid}, 'canadmin', '1')")
|
|
|
|
lines.append(f"INSERT INTO `{PREFIX}_user_setting`")
|
|
lines.append(" (`user_id`, `setting_name`, `setting_value`)")
|
|
lines.append("VALUES")
|
|
lines.append(",\n".join(setting_rows) + ";")
|
|
lines.append("")
|
|
|
|
# ── _wt_user_gedcom_setting ───────────────────────────────────────────────
|
|
gedcom_rows = [
|
|
f" ({u['user_id']}, {TREE_ID}, 'canedit', {sql_str(u['canedit'])})"
|
|
for u in users
|
|
]
|
|
lines.append(f"INSERT INTO `{PREFIX}_user_gedcom_setting`")
|
|
lines.append(" (`user_id`, `gedcom_id`, `setting_name`, `setting_value`)")
|
|
lines.append("VALUES")
|
|
lines.append(",\n".join(gedcom_rows) + ";")
|
|
lines.append("")
|
|
|
|
# Reset AUTO_INCREMENT past the highest imported uid
|
|
max_uid = max(u["user_id"] for u in users)
|
|
lines.append(f"ALTER TABLE `{PREFIX}_user` AUTO_INCREMENT = {max_uid + 1};")
|
|
lines.append("")
|
|
lines.append("SET foreign_key_checks = 1;")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ── Main ──────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
print(f"Streaming {SQL_FILE} …", flush=True)
|
|
tables = stream_tables(SQL_FILE, TABLES)
|
|
|
|
for tname, tdata in tables.items():
|
|
print(f" {tname}: {len(tdata['rows'])} rows")
|
|
|
|
users, skipped = build_users(tables)
|
|
print(f"\nActive users to migrate : {len(users)}")
|
|
print(f"Skipped : {len(skipped)}")
|
|
for uid, name, reason in skipped:
|
|
print(f" SKIP uid={uid:5d} {(name or ''):<20s} {reason}")
|
|
|
|
# Role summary
|
|
from collections import Counter
|
|
role_counts = Counter(u["canedit"] for u in users)
|
|
admin_count = sum(1 for u in users if u["canadmin"])
|
|
print(f"\nTree access — admin: {role_counts['admin']} access: {role_counts['access']} none: {role_counts.get('none', 0)}")
|
|
print(f"Site admin — {admin_count} user(s)")
|
|
|
|
print(f"\nHashing {len(users)} passwords (bcrypt cost=12) …", flush=True)
|
|
plain_pws: dict[int, str] = {}
|
|
pw_hashes: dict[int, str] = {}
|
|
for i, u in enumerate(users, 1):
|
|
pw = gen_password()
|
|
plain_pws[u["user_id"]] = pw
|
|
pw_hashes[u["user_id"]] = hash_bcrypt(pw)
|
|
print(f" [{i:3d}/{len(users)}] uid={u['user_id']:5d} {u['user_name']}", end="\r", flush=True)
|
|
print()
|
|
|
|
print(f"\nWriting {OUT_SQL} …")
|
|
OUT_SQL.write_text(render_sql(users, pw_hashes), encoding="utf-8")
|
|
|
|
print(f"Writing {OUT_CSV} …")
|
|
with OUT_CSV.open("w", newline="", encoding="utf-8") as cf:
|
|
w = csv.writer(cf)
|
|
w.writerow(["user_id", "user_name", "real_name", "email", "temp_password"])
|
|
for u in users:
|
|
w.writerow([
|
|
u["user_id"], u["user_name"], u["real_name"],
|
|
u["email"], plain_pws[u["user_id"]],
|
|
])
|
|
|
|
print("\nDone.")
|
|
print(f" {OUT_SQL.name} ({OUT_SQL.stat().st_size:,} bytes)")
|
|
print(f" {OUT_CSV.name} ({OUT_CSV.stat().st_size:,} bytes)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|