Files
jongryangje/writable/tools/sync_basic_codes_from_csv.py

146 lines
4.7 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Parse 종량제_개발목록_20260127(기본코드 종류).csv → writable/database/code_master_sync_from_csv.sql"""
from __future__ import annotations
import csv
import re
import sys
from pathlib import Path
def read_csv_rows(path: Path) -> list[list[str]]:
raw = path.read_text(encoding="utf-8-sig")
return list(csv.reader(raw.splitlines()))
def extract_pairs(fields: list[str], first_code_idx: int, ncols: int, step: int = 3) -> list[tuple[str, str]]:
pairs: list[tuple[str, str]] = []
for k in range(ncols):
i = first_code_idx + k * step
if i + 1 >= len(fields):
pairs.append(("", ""))
continue
pairs.append((fields[i].strip(), fields[i + 1].strip()))
return pairs
SKIP_NAME = re.compile(r"코드\s*[ABCD]|순번\s*두자리|등록되는구의", re.I)
def valid_detail(cd_code: str, cd_name: str) -> bool:
if not cd_code or not cd_name:
return False
if cd_code in ("세부코드", "코드명"):
return False
if SKIP_NAME.search(cd_name) or SKIP_NAME.search(cd_code):
return False
return True
def dedup_pairs(pairs: list[tuple[str, str]]) -> list[tuple[str, str]]:
seen: set[str] = set()
out: list[tuple[str, str]] = []
for c, n in pairs:
if c in seen:
continue
seen.add(c)
out.append((c, n))
return out
def main() -> int:
root = Path(__file__).resolve().parents[2]
csv_path = root / "docs/종량제 관련 자료/종량제 개발목록/종량제_개발목록_20260127(기본코드 종류).csv"
if not csv_path.exists():
print("CSV not found:", csv_path, file=sys.stderr)
return 1
rows = read_csv_rows(csv_path)
details: dict[str, list[tuple[str, str]]] = {chr(65 + i): [] for i in range(25)}
KIND_NAMES: dict[str, str] = {}
for fields in rows:
if len(fields) >= 2:
a, b = fields[0].strip(), fields[1].strip()
if len(a) == 1 and a.isalpha() and "A" <= a <= "Y" and b and "세부코드" not in b:
KIND_NAMES[a] = b
# 블록 AI: CSV 상 4행째~ ≈ rows[3]부터 동·메모 행 전까지 (rows[3:32])
for fields in rows[3:32]:
if len(fields) < 5:
continue
pairs = extract_pairs(fields, 3, 9, step=3)
for col, let in enumerate("ABCDEFGHI"):
c, n = pairs[col]
if valid_detail(c, n):
details[let].append((c, n))
# 블록 JR: 데이터 rows[37:64]
for fields in rows[37:64]:
if len(fields) < 5:
continue
pairs = extract_pairs(fields, 3, 9, step=3)
for col, let in enumerate("JKLMNOPQR"):
c, n = pairs[col]
if valid_detail(c, n):
details[let].append((c, n))
# 블록 SY: rows[68:]
for fields in rows[68:]:
if len(fields) < 5:
continue
pairs = extract_pairs(fields, 3, 7, step=3)
for col, let in enumerate("STUVWXY"):
c, n = pairs[col]
if valid_detail(c, n):
details[let].append((c, n))
for L in details:
details[L] = dedup_pairs(details[L])
sql: list[str] = [
"-- Sync missing rows from 종량제_개발목록_20260127(기본코드 종류).csv",
"-- Generated by writable/tools/sync_basic_codes_from_csv.py",
"SET NAMES utf8mb4;",
"",
]
for L in "ABCDEFGHIJKLMNOPQRSTUVWXY":
name = KIND_NAMES.get(L, L)
ne = name.replace("'", "''")
sql.append(
f"INSERT INTO `code_kind` (`ck_code`, `ck_name`, `ck_state`, `ck_regdate`) "
f"SELECT '{L}', '{ne}', 1, NOW() FROM DUAL "
f"WHERE NOT EXISTS (SELECT 1 FROM `code_kind` c WHERE c.ck_code = '{L}');"
)
sql.append("")
for L in "ABCDEFGHIJKLMNOPQRSTUVWXY":
sort_i = 0
for c, n in details[L]:
sort_i += 10
ce = c.replace("'", "''")
ne = n.replace("'", "''")
sql.append(
"INSERT INTO `code_detail` (`cd_ck_idx`, `cd_code`, `cd_name`, `cd_sort`, `cd_state`, `cd_regdate`) "
f"SELECT k.ck_idx, '{ce}', '{ne}', {sort_i}, 1, NOW() FROM `code_kind` k "
f"WHERE k.ck_code = '{L}' AND NOT EXISTS ("
"SELECT 1 FROM `code_detail` d WHERE d.cd_ck_idx = k.ck_idx AND d.cd_code = "
f"'{ce}'"
");"
)
out_path = root / "writable/database/code_master_sync_from_csv.sql"
out_path.write_text("\n".join(sql) + "\n", encoding="utf-8")
print("Wrote", out_path)
for L in "ABCDEFGHIJKLMNOPQRSTUVWXY":
print(f" {L}: {len(details[L])} details (unique cd_code)")
return 0
if __name__ == "__main__":
raise SystemExit(main())