Files
PLStatus/status_automation.py
2026-02-17 09:17:46 +03:00

637 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Автоматическая смена статуса в мессенджере по маске IP.
Запускается при старте ОС, проверяет IP, ждёт процесс мессенджера,
через заданное время выполняет клики: иконка -> статус -> очистка -> ввод текста -> сохранить.
Работает на Windows и Linux.
pyinstaller --onefile --name PLStatus --clean --noconsole --icon icon.ico status_automation.py
"""
import sys
import socket
import time
import platform
import subprocess
import logging
from pathlib import Path
import json
import psutil
import pyautogui
import pyperclip
# Отключаем защиту pyautogui от выхода за край экрана (мессенджер может быть где угодно)
pyautogui.FAILSAFE = True
pyautogui.PAUSE = 0.15
CONFIG_NAME = "config.json"
LOG_FILE = "status_automation.log"
# При запуске из собранного exe (PyInstaller и т.п.) — папка с исполняемым файлом
SCRIPT_DIR = Path(sys.executable).resolve().parent if getattr(sys, "frozen", False) else Path(__file__).resolve().parent
def _setup_logging():
"""Настраивает логирование в файл (файл перезаписывается при каждом запуске, UTF-8)."""
log_path = SCRIPT_DIR / LOG_FILE
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
handler = logging.FileHandler(log_path, mode="w", encoding="utf-8")
handler.setFormatter(formatter)
root = logging.getLogger()
root.setLevel(logging.INFO)
if root.handlers:
root.handlers.clear()
root.addHandler(handler)
return logging.getLogger(__name__)
log = _setup_logging()
def _app_get_os(defaults, base_key):
"""Возвращает значение из _app для текущей ОС: base_key + '_windows' или '_linux'."""
os_key = "windows" if platform.system() == "Windows" else "linux"
key_os = f"{base_key}_{os_key}"
return defaults.get(key_os) or defaults.get(base_key)
def load_config():
"""
Загружает конфиг из config.json.
Формат: ключи — маски IP (первые три октета), значение — настройки { proc_name, message, pause };
опциональные ключи: "_app" — общие настройки окна; "default" — профиль, если текущий IP не совпал ни с одной маской.
Возвращает (profile, ip_mask): профиль и ключ (маска IP или "default"), или (None, None) если конфиг невалиден.
"""
config_path = SCRIPT_DIR / CONFIG_NAME
if not config_path.exists():
log.error("Файл конфигурации не найден: %s", config_path)
sys.exit(1)
with open(config_path, "r", encoding="utf-8") as f:
raw = json.load(f)
if not isinstance(raw, dict):
log.error("Конфиг должен быть объектом (ключи — маски IP).")
sys.exit(1)
prefixes = get_local_ip_prefixes()
defaults = raw.pop("_app", None) or {}
fallback_block = raw.pop("default", None)
for ip_mask, block in raw.items():
if not isinstance(block, dict):
continue
if ip_mask.strip() not in prefixes:
continue
profile = _build_profile(block, defaults, ip_mask)
if profile is not None:
return profile, ip_mask
if isinstance(fallback_block, dict):
profile = _build_profile(fallback_block, defaults, "default")
if profile is not None:
log.info("Текущий IP не в конфиге — используем профиль по ключу default.")
return profile, "default"
return None, None
def _build_profile(block, defaults, label):
"""Собирает профиль из блока и defaults. При ошибке возвращает None и логирует."""
# Имя процесса и заголовок окна — только из _app, по текущей ОС
process_name = _app_get_os(defaults, "proc_name")
window_title = _app_get_os(defaults, "window_title")
profile = {
"window_title": window_title,
"icon_position": block.get("icon_position") or defaults.get("icon_position"),
"status_position": block.get("status_position") or defaults.get("status_position"),
"save_button_position": block.get("save_button_position") or defaults.get("save_button_position"),
"icon_position_rel": block.get("icon_position_rel") or defaults.get("icon_position_rel"),
"status_position_rel": block.get("status_position_rel") or defaults.get("status_position_rel"),
"save_button_position_rel": block.get("save_button_position_rel") or defaults.get("save_button_position_rel"),
"icon_image": block.get("icon_image") or defaults.get("icon_image"),
"status_image": block.get("status_image") or defaults.get("status_image"),
"save_button_image": block.get("save_button_image") or defaults.get("save_button_image"),
"image_confidence": block.get("image_confidence") or defaults.get("image_confidence") or 0.8,
"status_message": block.get("message", ""),
"process_name": process_name,
"startup_delay": int(block.get("pause", 60)),
}
if not profile.get("process_name"):
log.error("В _app укажите proc_name_windows и/или proc_name_linux для текущей ОС")
return None
if not profile.get("window_title"):
log.error("В _app укажите window_title_windows и/или window_title_linux для текущей ОС")
return None
# Для каждой точки клика: либо картинка (*_image), либо координаты (*_position или *_position_rel)
for key_img, key_px, key_rel in (
("icon_image", "icon_position", "icon_position_rel"),
("status_image", "status_position", "status_position_rel"),
("save_button_image", "save_button_position", "save_button_position_rel"),
):
has_img = bool(profile.get(key_img))
has_px = profile.get(key_px) is not None
has_rel = profile.get(key_rel) is not None
if not (has_img or has_px or has_rel):
log.error("В конфиге для %s (или в _app) укажите %s или %s или %s", label, key_img, key_px, key_rel)
return None
return profile
def get_local_ip_prefixes():
"""Возвращает множество префиксов IP (первые три октета) для всех IPv4-интерфейсов."""
prefixes = set()
try:
af_inet = getattr(psutil, "AF_INET", None) or socket.AF_INET
af_link = getattr(psutil, "AF_LINK", -1)
for name, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family == af_link:
continue
if addr.family != af_inet:
continue
ip = getattr(addr, "address", None) or ""
if not ip or ip.startswith("127."):
continue
parts = ip.split(".")
if len(parts) == 4 and all(p.isdigit() for p in parts):
prefixes.add(".".join(parts[:3]))
except Exception as e:
log.warning("Ошибка при получении списка IP: %s", e)
print(prefixes)
return prefixes
def ip_matches_mask(ip_mask):
"""Проверяет, совпадает ли маска (первые три октета) с одним из локальных IP."""
ip_mask = (ip_mask or "").strip()
if not ip_mask:
return False
# Нормализуем маску до трёх октетов
parts = ip_mask.split(".")
if len(parts) >= 3:
prefix = ".".join(parts[:3])
else:
prefix = ip_mask
return prefix in get_local_ip_prefixes()
def wait_for_process(process_name, poll_interval=2):
"""Ждёт появления процесса с заданным именем. Возвращает PID или None при ошибке."""
name_lower = process_name.lower()
# На Windows "App.exe", но процесс может быть и без .exe в имени — проверяем оба варианта
log.info("Ожидание процесса: %s", process_name)
while True:
for proc in psutil.process_iter(["name", "pid"]):
try:
pname = (proc.info.get("name") or "").lower()
if pname == name_lower or pname == name_lower.replace(".exe", ""):
return proc.info["pid"]
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
time.sleep(poll_interval)
def _focus_window_windows_by_pid(pid):
"""Активирует главное окно процесса по PID (Windows, ctypes)."""
if platform.system() != "Windows":
return False
try:
import ctypes
from ctypes import wintypes
user32 = ctypes.windll.user32
found_hwnd = wintypes.HWND()
def enum_callback(hwnd, _pid):
if user32.IsWindowVisible(hwnd):
wpid = wintypes.DWORD()
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(wpid))
if wpid.value == _pid:
found_hwnd.value = hwnd
return False # прекратить перебор
return True
WNDENUMPROC = ctypes.WINFUNCTYPE(wintypes.BOOL, wintypes.HWND, wintypes.LPARAM)
user32.EnumWindows(WNDENUMPROC(enum_callback), pid)
if found_hwnd.value:
user32.SetForegroundWindow(found_hwnd.value)
time.sleep(0.3)
return True
except Exception as e:
log.warning("Активация окна по PID не удалась: %s", e)
return False
def _get_window_rect_windows_by_pid(pid):
"""Возвращает (left, top, width, height) главного окна процесса (Windows)."""
if platform.system() != "Windows":
return None
try:
import ctypes
from ctypes import wintypes
user32 = ctypes.windll.user32
rect = wintypes.RECT()
found_hwnd = wintypes.HWND()
def enum_callback(hwnd, _pid):
if user32.IsWindowVisible(hwnd):
wpid = wintypes.DWORD()
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(wpid))
if wpid.value == _pid:
found_hwnd.value = hwnd
return False
return True
WNDENUMPROC = ctypes.WINFUNCTYPE(wintypes.BOOL, wintypes.HWND, wintypes.LPARAM)
user32.EnumWindows(WNDENUMPROC(enum_callback), pid)
if found_hwnd.value and user32.GetWindowRect(found_hwnd.value, ctypes.byref(rect)):
return (rect.left, rect.top, rect.right - rect.left, rect.bottom - rect.top)
except Exception as e:
log.warning("Получение rect по PID не удалось: %s", e)
return None
def focus_window_windows(title):
"""Фокус окна по заголовку (Windows)."""
try:
import pygetwindow as gw
wins = gw.getWindowsWithTitle(title)
if not wins:
# Частичное совпадение
for w in gw.getAllWindows():
if title.lower() in (w.title or "").lower():
w.activate()
time.sleep(0.3)
return True
return False
wins[0].activate()
time.sleep(0.3)
return True
except Exception as e:
log.warning("Не удалось активировать окно (Windows): %s", e)
return False
def _focus_window_linux_by_pid(pid):
"""Активирует окно процесса по PID (Linux, wmctrl -lp)."""
try:
out = subprocess.run(
["wmctrl", "-lp"],
capture_output=True,
text=True,
timeout=5,
)
if out.returncode != 0:
return False
pid_str = str(pid)
for line in out.stdout.strip().splitlines():
parts = line.split(None, 3)
if len(parts) >= 3 and parts[2] == pid_str:
subprocess.run(["wmctrl", "-ia", parts[0]], capture_output=True, timeout=5)
time.sleep(0.3)
return True
except Exception as e:
log.warning("Активация окна по PID (Linux): %s", e)
return False
def _get_window_rect_linux_by_pid(pid):
"""Возвращает (left, top, width, height) окна процесса (Linux, wmctrl)."""
try:
out = subprocess.run(["wmctrl", "-lp"], capture_output=True, text=True, timeout=5)
if out.returncode != 0:
return None
pid_str = str(pid)
wid = None
for line in out.stdout.strip().splitlines():
parts = line.split(None, 3)
if len(parts) >= 3 and parts[2] == pid_str:
wid = parts[0]
break
if not wid:
return None
out2 = subprocess.run(["wmctrl", "-l", "-G"], capture_output=True, text=True, timeout=5)
if out2.returncode != 0:
return None
for line in out2.stdout.strip().splitlines():
parts = line.split(None, 7)
if len(parts) >= 6 and parts[0] == wid:
x, y, w, h = int(parts[2]), int(parts[3]), int(parts[4]), int(parts[5])
return (x, y, w, h)
except Exception as e:
log.warning("Получение rect по PID (Linux): %s", e)
return None
def focus_window_linux(title):
"""Фокус окна по заголовку (Linux, wmctrl)."""
try:
subprocess.run(
["wmctrl", "-a", title],
capture_output=True,
timeout=5,
check=False,
)
time.sleep(0.3)
return True
except FileNotFoundError:
log.warning("wmctrl не установлен. Установите: sudo apt install wmctrl (или аналог)")
return False
except subprocess.TimeoutExpired:
return False
except Exception as e:
log.warning("Ошибка активации окна (Linux): %s", e)
return False
def get_window_rect_windows(title):
"""Позиция и размер окна (Windows). Возвращает (left, top, width, height) или None."""
try:
import pygetwindow as gw
wins = gw.getWindowsWithTitle(title)
if not wins:
for w in gw.getAllWindows():
if title.lower() in (w.title or "").lower():
return (w.left, w.top, w.width, w.height)
return None
w = wins[0]
return (w.left, w.top, w.width, w.height)
except Exception as e:
log.warning("Ошибка получения позиции окна (Windows): %s", e)
return None
def get_window_rect_linux(title):
"""Позиция и размер окна (Linux, wmctrl -l -G). Возвращает (left, top, width, height) или None."""
try:
out = subprocess.run(
["wmctrl", "-l", "-G"],
capture_output=True,
text=True,
timeout=5,
)
if out.returncode != 0:
return None
for line in out.stdout.strip().splitlines():
# wmctrl -l -G: id desktop pid x y w h host title
parts = line.split(None, 7)
if len(parts) < 8:
continue
win_title = parts[7]
if title.lower() in win_title.lower():
x, y, w, h = int(parts[2]), int(parts[3]), int(parts[4]), int(parts[5])
return (x, y, w, h)
return None
except Exception as e:
log.warning("Ошибка получения позиции окна (Linux): %s", e)
return None
def focus_window(title, pid=None):
"""Активирует окно по заголовку; если не найдено и задан pid — по PID (Windows) или wmctrl (Linux)."""
if platform.system() == "Windows":
if focus_window_windows(title):
return True
if pid is not None and _focus_window_windows_by_pid(pid):
log.info("Окно активировано по PID процесса (заголовок не совпал).")
return True
return False
if focus_window_linux(title):
return True
if pid is not None and _focus_window_linux_by_pid(pid):
log.info("Окно активировано по PID процесса (заголовок не совпал).")
return True
return False
def get_window_rect(title, pid=None):
"""Позиция и размер окна по заголовку; если не найдено и задан pid — по PID (Windows/Linux)."""
if platform.system() == "Windows":
rect = get_window_rect_windows(title)
if rect is None and pid is not None:
rect = _get_window_rect_windows_by_pid(pid)
return rect
rect = get_window_rect_linux(title)
if rect is None and pid is not None:
rect = _get_window_rect_linux_by_pid(pid)
return rect
def click_relative(rect, relative_x, relative_y):
"""Клик в координатах относительно окна (rect = left, top, w, h)."""
abs_x = rect[0] + relative_x
abs_y = rect[1] + relative_y
pyautogui.click(abs_x, abs_y)
def _resolve_click_position(rect, pixel_pos, rel_pos):
"""
Возвращает (abs_x, abs_y) для клика.
rect = (left, top, width, height). Если заданы rel_pos (доли 01 от размера окна) — используем их;
иначе pixel_pos (смещение в пикселях от левого верхнего угла окна).
"""
if rect is None:
if pixel_pos is None:
return None
return (pixel_pos[0], pixel_pos[1])
left, top, width, height = rect[0], rect[1], rect[2], rect[3]
if rel_pos is not None and len(rel_pos) >= 2:
try:
rx, ry = float(rel_pos[0]), float(rel_pos[1])
return (left + rx * width, top + ry * height)
except (TypeError, ValueError):
pass
if pixel_pos is not None and len(pixel_pos) >= 2:
return (left + pixel_pos[0], top + pixel_pos[1])
return None
def _resolve_image_path(path):
"""Путь к файлу: если не абсолютный — относительно папки скрипта."""
if not path:
return None
p = Path(path)
if not p.is_absolute():
p = SCRIPT_DIR / p
return p if p.exists() else None
def _find_and_click_image_opencv(template_path, confidence=0.8):
"""
Поиск по картинке через OpenCV + mss (без pyscreeze/Pillow).
Делает скриншот через mss, ищет шаблон через cv2.matchTemplate, кликает в центр.
"""
try:
import numpy as np
import cv2
import mss
except ImportError as e:
log.warning("Поиск по картинке недоступен (нужны opencv-python, mss): %s", e)
return False
template = cv2.imread(str(template_path))
if template is None:
log.warning("OpenCV не смог прочитать изображение: %s", template_path)
return False
with mss.mss() as sct:
monitor = sct.monitors[0]
screenshot = sct.grab(monitor)
img = np.array(screenshot)
img_bgr = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)
result = cv2.matchTemplate(img_bgr, template, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
if max_val < confidence:
return False
h, w = template.shape[:2]
center_x = max_loc[0] + w // 2
center_y = max_loc[1] + h // 2
pyautogui.click(center_x, center_y)
return True
def find_and_click_image(image_path, confidence=0.8, timeout=10, retries=3):
"""
Ищет изображение на экране, кликает в центр.
Сначала пробует pyautogui.locateOnScreen (pyscreeze + Pillow);
при ошибке импорта — резервно OpenCV + mss.
"""
resolved = _resolve_image_path(image_path)
if not resolved:
log.warning("Файл изображения не найден: %s", image_path)
return False
path_str = str(resolved)
use_opencv_fallback = False
for attempt in range(retries):
if not use_opencv_fallback:
try:
loc = pyautogui.locateOnScreen(path_str, confidence=confidence)
if loc is not None:
x = loc.left + loc.width // 2
y = loc.top + loc.height // 2
pyautogui.click(x, y)
return True
except TypeError:
try:
loc = pyautogui.locateOnScreen(path_str)
if loc is not None:
x = loc.left + loc.width // 2
y = loc.top + loc.height // 2
pyautogui.click(x, y)
return True
except Exception:
pass
use_opencv_fallback = True
except Exception as e:
err_text = str(e).lower()
if "pyscreeze" in err_text or "pillow" in err_text or "unable to import" in err_text:
use_opencv_fallback = True
else:
log.warning("Поиск по картинке %s (попытка %s): %s", image_path, attempt + 1, e)
if use_opencv_fallback:
if _find_and_click_image_opencv(resolved, confidence=confidence):
return True
if attempt < retries - 1:
time.sleep(timeout / retries)
continue
if attempt < retries - 1:
time.sleep(timeout / retries)
if use_opencv_fallback:
log.warning("Изображение на экране не найдено (OpenCV): %s", image_path)
else:
log.warning("Изображение на экране не найдено: %s", image_path)
return False
def set_status(config, pid=None):
"""Выполняет последовательность: фокус окна, клики, очистка, ввод текста, сохранить. pid — для резервного поиска окна по процессу."""
title = config["window_title"]
message = config["status_message"]
confidence = float(config.get("image_confidence", 0.8))
if not focus_window(title, pid=pid):
log.error("Не удалось активировать окно: %s", title)
return False
time.sleep(0.7)
rect = get_window_rect(title, pid=pid)
use_icon_img = config.get("icon_image")
use_status_img = config.get("status_image")
use_save_img = config.get("save_button_image")
# Иконка
if use_icon_img:
if not find_and_click_image(use_icon_img, confidence=confidence):
log.error("Не найдена картинка иконки: %s", use_icon_img)
return False
else:
icon_xy = _resolve_click_position(
rect, config.get("icon_position"), config.get("icon_position_rel")
)
if icon_xy is None:
log.error("Не заданы координаты иконки (icon_image или icon_position / icon_position_rel).")
return False
if not rect:
log.warning("Позиция окна не получена; координаты считаются абсолютными.")
pyautogui.click(icon_xy[0], icon_xy[1])
time.sleep(0.4)
# Поле статуса
if use_status_img:
if not find_and_click_image(use_status_img, confidence=confidence):
log.error("Не найдена картинка поля статуса: %s", use_status_img)
return False
else:
status_xy = _resolve_click_position(
rect, config.get("status_position"), config.get("status_position_rel")
)
if status_xy is None:
log.error("Не заданы координаты поля статуса (status_image или status_position / status_position_rel).")
return False
pyautogui.click(status_xy[0], status_xy[1])
time.sleep(0.5)
# Выделить всё и удалить (очистка текущего статуса)
pyautogui.hotkey("ctrl", "a")
time.sleep(0.3)
pyautogui.press("delete")
time.sleep(0.3)
# Ввод нового статуса (через буфер обмена)
pyperclip.copy(message)
pyautogui.hotkey("ctrl", "v")
time.sleep(0.3)
# Кнопка «Сохранить»
if use_save_img:
if not find_and_click_image(use_save_img, confidence=confidence):
log.error("Не найдена картинка кнопки «Сохранить»: %s", use_save_img)
return False
else:
save_xy = _resolve_click_position(
rect, config.get("save_button_position"), config.get("save_button_position_rel")
)
if save_xy is None:
log.error("Не заданы координаты кнопки сохранения (save_button_image или save_button_position / save_button_position_rel).")
return False
pyautogui.click(save_xy[0], save_xy[1])
log.info("Статус установлен: %s", message)
return True
def main():
config, ip_mask = load_config()
if config is None:
log.info("Текущий IP не совпадает ни с одной маской из конфига — выход без действий.")
return 0
process_name = config["process_name"]
startup_delay = config["startup_delay"]
log.info("IP совпадает с маской %s, ожидание процесса %s...", ip_mask, process_name)
pid = wait_for_process(process_name)
log.info("Процесс запущен (PID %s), ожидание %s сек...", pid, startup_delay)
time.sleep(startup_delay)
set_status(config, pid=pid)
return 0
if __name__ == "__main__":
sys.exit(main())