238 lines
7.3 KiB
Python
Executable File
238 lines
7.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import re
|
|
import shutil
|
|
import logging
|
|
from pathlib import Path
|
|
import pwd
|
|
import grp
|
|
import filecmp
|
|
|
|
# ==== CONFIG ====
|
|
ARUL_SPACE_CONTENT = "/var/www/html/arulbalaji.xyz/content/journal"
|
|
PRO_CONTENT = "/var/www/html/professional-site/content/post"
|
|
ARUL_SPACE_MEDIA = "/var/www/html/arulbalaji.xyz/static/media"
|
|
PRO_MEDIA = "/var/www/html/professional-site/static/media"
|
|
SUMMARY_FOLDER = "/var/www/html/professional-site/assets/summaries"
|
|
LOG_FILE = "/var/log/hugo_sync.log"
|
|
VALID_TAGS = {"Tech", "Tech tutorials"}
|
|
OWNER_USER = "arul" # Set to None to skip chown
|
|
|
|
# ==== LOGGER SETUP ====
|
|
logger = logging.getLogger("SyncLogger")
|
|
logger.setLevel(logging.INFO)
|
|
|
|
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')
|
|
|
|
file_handler = logging.FileHandler(LOG_FILE)
|
|
file_handler.setFormatter(formatter)
|
|
logger.addHandler(file_handler)
|
|
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(formatter)
|
|
logger.addHandler(console_handler)
|
|
|
|
# ==== HELPERS ====
|
|
|
|
def parse_frontmatter(lines):
|
|
front = {}
|
|
in_frontmatter = False
|
|
body_start = 0
|
|
for i, line in enumerate(lines):
|
|
if line.strip() == "---":
|
|
if not in_frontmatter:
|
|
in_frontmatter = True
|
|
else:
|
|
body_start = i + 1
|
|
break
|
|
elif in_frontmatter and ':' in line:
|
|
key, value = line.split(':', 1)
|
|
front[key.strip()] = value.strip().strip('"')
|
|
return front, body_start
|
|
|
|
def extract_meta_comments(lines):
|
|
description = None
|
|
featured = None
|
|
for line in lines:
|
|
if "#pro-site description:" in line:
|
|
description = line.split(":", 1)[1].strip()
|
|
if "#pro-site featured_image:" in line:
|
|
featured = line.split(":", 1)[1].strip()
|
|
return description, featured
|
|
|
|
def has_valid_tags(front):
|
|
tags_line = front.get("tags", "")
|
|
tags = re.findall(r'"(.*?)"', tags_line)
|
|
return any(tag in VALID_TAGS for tag in tags)
|
|
|
|
def chown_to_user(file_path, username):
|
|
if not username:
|
|
return
|
|
try:
|
|
uid = pwd.getpwnam(username).pw_uid
|
|
gid = grp.getgrnam(username).gr_gid
|
|
os.chown(file_path, uid, gid)
|
|
except Exception as e:
|
|
logger.warning(f"Could not chown {file_path}: {e}")
|
|
|
|
def strip_shortcodes(content):
|
|
return re.sub(r"{{<[^>]+>}}", "", content)
|
|
|
|
def files_are_identical(file1, file2):
|
|
try:
|
|
if os.path.getsize(file1) != os.path.getsize(file2):
|
|
return False
|
|
return filecmp.cmp(file1, file2, shallow=False)
|
|
except Exception as e:
|
|
logger.warning(f"Error comparing files {file1} and {file2}: {e}")
|
|
return False
|
|
|
|
# ==== TRANSFORMATION ====
|
|
|
|
def transform_file(file_path):
|
|
try:
|
|
with open(file_path, "r") as f:
|
|
lines = f.readlines()
|
|
except PermissionError:
|
|
logger.error(f"Permission denied: {file_path}")
|
|
return None, None
|
|
|
|
if any(part.startswith('.') for part in file_path.parts):
|
|
return None, None
|
|
|
|
front, body_start = parse_frontmatter(lines)
|
|
if not has_valid_tags(front):
|
|
return None, None
|
|
|
|
description, featured_image = extract_meta_comments(lines)
|
|
title = front.get("title", "Untitled")
|
|
raw_date = front.get("date", "1970-01-01")
|
|
formatted_date = f"{raw_date}T00:00:00+05:30"
|
|
|
|
body = "".join(lines[body_start:]).strip()
|
|
|
|
new_frontmatter = f"""---
|
|
date: {formatted_date}
|
|
description: "{description or ''}"
|
|
featured_image: "{featured_image or ''}"
|
|
title: "{title}"
|
|
---
|
|
"""
|
|
|
|
full_markdown = new_frontmatter + "\n\n" + body
|
|
plain_text_summary = strip_shortcodes(body).strip()
|
|
|
|
return full_markdown, plain_text_summary
|
|
|
|
# ==== MEDIA FOLDER COMPARISON ====
|
|
|
|
def are_folders_identical(folder1, folder2):
|
|
def get_all_files(base_folder):
|
|
file_set = set()
|
|
for root, _, files in os.walk(base_folder):
|
|
for f in files:
|
|
full_path = os.path.join(root, f)
|
|
rel_path = os.path.relpath(full_path, base_folder)
|
|
file_set.add(rel_path)
|
|
return file_set
|
|
|
|
if not os.path.exists(folder1) or not os.path.exists(folder2):
|
|
return False
|
|
|
|
files1 = get_all_files(folder1)
|
|
files2 = get_all_files(folder2)
|
|
|
|
if files1 != files2:
|
|
return False
|
|
|
|
for rel_file in files1:
|
|
file1 = os.path.join(folder1, rel_file)
|
|
file2 = os.path.join(folder2, rel_file)
|
|
if not files_are_identical(file1, file2):
|
|
return False
|
|
|
|
return True
|
|
|
|
# ==== MAIN SYNC CHECK ====
|
|
|
|
def check_already_synced():
|
|
for md_file in Path(ARUL_SPACE_CONTENT).rglob("*.md"):
|
|
transformed, _ = transform_file(md_file)
|
|
if not transformed:
|
|
continue
|
|
pro_file_path = Path(PRO_CONTENT) / md_file.name
|
|
if not pro_file_path.exists():
|
|
return False
|
|
try:
|
|
with open(pro_file_path, "r") as f:
|
|
pro_content = f.read()
|
|
if pro_content != transformed:
|
|
return False
|
|
except Exception as e:
|
|
logger.warning(f"Could not read {pro_file_path}: {e}")
|
|
return False
|
|
return True
|
|
|
|
# ==== MEDIA SYNC ====
|
|
|
|
def copy_media_folder():
|
|
if not os.path.exists(ARUL_SPACE_MEDIA):
|
|
logger.warning(f"Media folder not found in Arul's space at {ARUL_SPACE_MEDIA}")
|
|
return
|
|
try:
|
|
if os.path.exists(PRO_MEDIA):
|
|
shutil.rmtree(PRO_MEDIA)
|
|
shutil.copytree(ARUL_SPACE_MEDIA, PRO_MEDIA, dirs_exist_ok=True)
|
|
if OWNER_USER:
|
|
for root, dirs, files in os.walk(PRO_MEDIA):
|
|
for name in files:
|
|
chown_to_user(os.path.join(root, name), OWNER_USER)
|
|
logger.info("📁 Media folder synced from Arul's space → professional site")
|
|
except Exception as e:
|
|
logger.error(f"Failed to sync media folder from Arul's space: {e}")
|
|
|
|
# ==== MAIN SYNC ====
|
|
|
|
def sync_markdowns():
|
|
synced = []
|
|
Path(PRO_CONTENT).mkdir(parents=True, exist_ok=True)
|
|
Path(SUMMARY_FOLDER).mkdir(parents=True, exist_ok=True)
|
|
|
|
for md_file in Path(ARUL_SPACE_CONTENT).rglob("*.md"):
|
|
try:
|
|
transformed, summary = transform_file(md_file)
|
|
if transformed:
|
|
out_path = Path(PRO_CONTENT) / md_file.name
|
|
with open(out_path, "w") as out_file:
|
|
out_file.write(transformed)
|
|
if OWNER_USER:
|
|
chown_to_user(out_path, OWNER_USER)
|
|
synced.append(str(md_file.name))
|
|
|
|
summary_path = Path(SUMMARY_FOLDER) / (md_file.stem + ".txt")
|
|
with open(summary_path, "w") as sum_file:
|
|
sum_file.write(summary)
|
|
if OWNER_USER:
|
|
chown_to_user(summary_path, OWNER_USER)
|
|
except Exception as e:
|
|
logger.error(f"Error processing {md_file.name}: {e}")
|
|
return synced
|
|
|
|
# ==== EXECUTION ====
|
|
|
|
if __name__ == "__main__":
|
|
if check_already_synced() and are_folders_identical(ARUL_SPACE_MEDIA, PRO_MEDIA):
|
|
logger.info("☑️ Already in sync. No changes needed.")
|
|
else:
|
|
synced_files = sync_markdowns()
|
|
copy_media_folder()
|
|
|
|
if synced_files:
|
|
for f in synced_files:
|
|
logger.info(f"Synced: {f}")
|
|
logger.info(f"✅ Synced files: {', '.join(synced_files)}")
|
|
else:
|
|
logger.info("☑️ No new valid tech/tutorial markdowns to sync.")
|
|
|