This commit is contained in:
2026-02-17 09:17:46 +03:00
commit b9930dbe59
16 changed files with 1089 additions and 0 deletions

636
status_automation.py Normal file
View File

@@ -0,0 +1,636 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Автоматическая смена статуса в мессенджере по маске IP.
Запускается при старте ОС, проверяет IP, ждёт процесс мессенджера,
через заданное время выполняет клики: иконка -> статус -> очистка -> ввод текста -> сохранить.
Работает на Windows и Linux.
pyinstaller --onefile --name PLStatus --clean --noconsole --icon icon.ico status_automation.py
"""
import sys
import socket
import time
import platform
import subprocess
import logging
from pathlib import Path
import json
import psutil
import pyautogui
import pyperclip
# Отключаем защиту pyautogui от выхода за край экрана (мессенджер может быть где угодно)
pyautogui.FAILSAFE = True
pyautogui.PAUSE = 0.15
CONFIG_NAME = "config.json"
LOG_FILE = "status_automation.log"
# При запуске из собранного exe (PyInstaller и т.п.) — папка с исполняемым файлом
SCRIPT_DIR = Path(sys.executable).resolve().parent if getattr(sys, "frozen", False) else Path(__file__).resolve().parent
def _setup_logging():
"""Настраивает логирование в файл (файл перезаписывается при каждом запуске, UTF-8)."""
log_path = SCRIPT_DIR / LOG_FILE
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
handler = logging.FileHandler(log_path, mode="w", encoding="utf-8")
handler.setFormatter(formatter)
root = logging.getLogger()
root.setLevel(logging.INFO)
if root.handlers:
root.handlers.clear()
root.addHandler(handler)
return logging.getLogger(__name__)
log = _setup_logging()
def _app_get_os(defaults, base_key):
"""Возвращает значение из _app для текущей ОС: base_key + '_windows' или '_linux'."""
os_key = "windows" if platform.system() == "Windows" else "linux"
key_os = f"{base_key}_{os_key}"
return defaults.get(key_os) or defaults.get(base_key)
def load_config():
"""
Загружает конфиг из config.json.
Формат: ключи — маски IP (первые три октета), значение — настройки { proc_name, message, pause };
опциональные ключи: "_app" — общие настройки окна; "default" — профиль, если текущий IP не совпал ни с одной маской.
Возвращает (profile, ip_mask): профиль и ключ (маска IP или "default"), или (None, None) если конфиг невалиден.
"""
config_path = SCRIPT_DIR / CONFIG_NAME
if not config_path.exists():
log.error("Файл конфигурации не найден: %s", config_path)
sys.exit(1)
with open(config_path, "r", encoding="utf-8") as f:
raw = json.load(f)
if not isinstance(raw, dict):
log.error("Конфиг должен быть объектом (ключи — маски IP).")
sys.exit(1)
prefixes = get_local_ip_prefixes()
defaults = raw.pop("_app", None) or {}
fallback_block = raw.pop("default", None)
for ip_mask, block in raw.items():
if not isinstance(block, dict):
continue
if ip_mask.strip() not in prefixes:
continue
profile = _build_profile(block, defaults, ip_mask)
if profile is not None:
return profile, ip_mask
if isinstance(fallback_block, dict):
profile = _build_profile(fallback_block, defaults, "default")
if profile is not None:
log.info("Текущий IP не в конфиге — используем профиль по ключу default.")
return profile, "default"
return None, None
def _build_profile(block, defaults, label):
"""Собирает профиль из блока и defaults. При ошибке возвращает None и логирует."""
# Имя процесса и заголовок окна — только из _app, по текущей ОС
process_name = _app_get_os(defaults, "proc_name")
window_title = _app_get_os(defaults, "window_title")
profile = {
"window_title": window_title,
"icon_position": block.get("icon_position") or defaults.get("icon_position"),
"status_position": block.get("status_position") or defaults.get("status_position"),
"save_button_position": block.get("save_button_position") or defaults.get("save_button_position"),
"icon_position_rel": block.get("icon_position_rel") or defaults.get("icon_position_rel"),
"status_position_rel": block.get("status_position_rel") or defaults.get("status_position_rel"),
"save_button_position_rel": block.get("save_button_position_rel") or defaults.get("save_button_position_rel"),
"icon_image": block.get("icon_image") or defaults.get("icon_image"),
"status_image": block.get("status_image") or defaults.get("status_image"),
"save_button_image": block.get("save_button_image") or defaults.get("save_button_image"),
"image_confidence": block.get("image_confidence") or defaults.get("image_confidence") or 0.8,
"status_message": block.get("message", ""),
"process_name": process_name,
"startup_delay": int(block.get("pause", 60)),
}
if not profile.get("process_name"):
log.error("В _app укажите proc_name_windows и/или proc_name_linux для текущей ОС")
return None
if not profile.get("window_title"):
log.error("В _app укажите window_title_windows и/или window_title_linux для текущей ОС")
return None
# Для каждой точки клика: либо картинка (*_image), либо координаты (*_position или *_position_rel)
for key_img, key_px, key_rel in (
("icon_image", "icon_position", "icon_position_rel"),
("status_image", "status_position", "status_position_rel"),
("save_button_image", "save_button_position", "save_button_position_rel"),
):
has_img = bool(profile.get(key_img))
has_px = profile.get(key_px) is not None
has_rel = profile.get(key_rel) is not None
if not (has_img or has_px or has_rel):
log.error("В конфиге для %s (или в _app) укажите %s или %s или %s", label, key_img, key_px, key_rel)
return None
return profile
def get_local_ip_prefixes():
"""Возвращает множество префиксов IP (первые три октета) для всех IPv4-интерфейсов."""
prefixes = set()
try:
af_inet = getattr(psutil, "AF_INET", None) or socket.AF_INET
af_link = getattr(psutil, "AF_LINK", -1)
for name, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family == af_link:
continue
if addr.family != af_inet:
continue
ip = getattr(addr, "address", None) or ""
if not ip or ip.startswith("127."):
continue
parts = ip.split(".")
if len(parts) == 4 and all(p.isdigit() for p in parts):
prefixes.add(".".join(parts[:3]))
except Exception as e:
log.warning("Ошибка при получении списка IP: %s", e)
print(prefixes)
return prefixes
def ip_matches_mask(ip_mask):
"""Проверяет, совпадает ли маска (первые три октета) с одним из локальных IP."""
ip_mask = (ip_mask or "").strip()
if not ip_mask:
return False
# Нормализуем маску до трёх октетов
parts = ip_mask.split(".")
if len(parts) >= 3:
prefix = ".".join(parts[:3])
else:
prefix = ip_mask
return prefix in get_local_ip_prefixes()
def wait_for_process(process_name, poll_interval=2):
"""Ждёт появления процесса с заданным именем. Возвращает PID или None при ошибке."""
name_lower = process_name.lower()
# На Windows "App.exe", но процесс может быть и без .exe в имени — проверяем оба варианта
log.info("Ожидание процесса: %s", process_name)
while True:
for proc in psutil.process_iter(["name", "pid"]):
try:
pname = (proc.info.get("name") or "").lower()
if pname == name_lower or pname == name_lower.replace(".exe", ""):
return proc.info["pid"]
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
time.sleep(poll_interval)
def _focus_window_windows_by_pid(pid):
"""Активирует главное окно процесса по PID (Windows, ctypes)."""
if platform.system() != "Windows":
return False
try:
import ctypes
from ctypes import wintypes
user32 = ctypes.windll.user32
found_hwnd = wintypes.HWND()
def enum_callback(hwnd, _pid):
if user32.IsWindowVisible(hwnd):
wpid = wintypes.DWORD()
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(wpid))
if wpid.value == _pid:
found_hwnd.value = hwnd
return False # прекратить перебор
return True
WNDENUMPROC = ctypes.WINFUNCTYPE(wintypes.BOOL, wintypes.HWND, wintypes.LPARAM)
user32.EnumWindows(WNDENUMPROC(enum_callback), pid)
if found_hwnd.value:
user32.SetForegroundWindow(found_hwnd.value)
time.sleep(0.3)
return True
except Exception as e:
log.warning("Активация окна по PID не удалась: %s", e)
return False
def _get_window_rect_windows_by_pid(pid):
"""Возвращает (left, top, width, height) главного окна процесса (Windows)."""
if platform.system() != "Windows":
return None
try:
import ctypes
from ctypes import wintypes
user32 = ctypes.windll.user32
rect = wintypes.RECT()
found_hwnd = wintypes.HWND()
def enum_callback(hwnd, _pid):
if user32.IsWindowVisible(hwnd):
wpid = wintypes.DWORD()
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(wpid))
if wpid.value == _pid:
found_hwnd.value = hwnd
return False
return True
WNDENUMPROC = ctypes.WINFUNCTYPE(wintypes.BOOL, wintypes.HWND, wintypes.LPARAM)
user32.EnumWindows(WNDENUMPROC(enum_callback), pid)
if found_hwnd.value and user32.GetWindowRect(found_hwnd.value, ctypes.byref(rect)):
return (rect.left, rect.top, rect.right - rect.left, rect.bottom - rect.top)
except Exception as e:
log.warning("Получение rect по PID не удалось: %s", e)
return None
def focus_window_windows(title):
"""Фокус окна по заголовку (Windows)."""
try:
import pygetwindow as gw
wins = gw.getWindowsWithTitle(title)
if not wins:
# Частичное совпадение
for w in gw.getAllWindows():
if title.lower() in (w.title or "").lower():
w.activate()
time.sleep(0.3)
return True
return False
wins[0].activate()
time.sleep(0.3)
return True
except Exception as e:
log.warning("Не удалось активировать окно (Windows): %s", e)
return False
def _focus_window_linux_by_pid(pid):
"""Активирует окно процесса по PID (Linux, wmctrl -lp)."""
try:
out = subprocess.run(
["wmctrl", "-lp"],
capture_output=True,
text=True,
timeout=5,
)
if out.returncode != 0:
return False
pid_str = str(pid)
for line in out.stdout.strip().splitlines():
parts = line.split(None, 3)
if len(parts) >= 3 and parts[2] == pid_str:
subprocess.run(["wmctrl", "-ia", parts[0]], capture_output=True, timeout=5)
time.sleep(0.3)
return True
except Exception as e:
log.warning("Активация окна по PID (Linux): %s", e)
return False
def _get_window_rect_linux_by_pid(pid):
"""Возвращает (left, top, width, height) окна процесса (Linux, wmctrl)."""
try:
out = subprocess.run(["wmctrl", "-lp"], capture_output=True, text=True, timeout=5)
if out.returncode != 0:
return None
pid_str = str(pid)
wid = None
for line in out.stdout.strip().splitlines():
parts = line.split(None, 3)
if len(parts) >= 3 and parts[2] == pid_str:
wid = parts[0]
break
if not wid:
return None
out2 = subprocess.run(["wmctrl", "-l", "-G"], capture_output=True, text=True, timeout=5)
if out2.returncode != 0:
return None
for line in out2.stdout.strip().splitlines():
parts = line.split(None, 7)
if len(parts) >= 6 and parts[0] == wid:
x, y, w, h = int(parts[2]), int(parts[3]), int(parts[4]), int(parts[5])
return (x, y, w, h)
except Exception as e:
log.warning("Получение rect по PID (Linux): %s", e)
return None
def focus_window_linux(title):
"""Фокус окна по заголовку (Linux, wmctrl)."""
try:
subprocess.run(
["wmctrl", "-a", title],
capture_output=True,
timeout=5,
check=False,
)
time.sleep(0.3)
return True
except FileNotFoundError:
log.warning("wmctrl не установлен. Установите: sudo apt install wmctrl (или аналог)")
return False
except subprocess.TimeoutExpired:
return False
except Exception as e:
log.warning("Ошибка активации окна (Linux): %s", e)
return False
def get_window_rect_windows(title):
"""Позиция и размер окна (Windows). Возвращает (left, top, width, height) или None."""
try:
import pygetwindow as gw
wins = gw.getWindowsWithTitle(title)
if not wins:
for w in gw.getAllWindows():
if title.lower() in (w.title or "").lower():
return (w.left, w.top, w.width, w.height)
return None
w = wins[0]
return (w.left, w.top, w.width, w.height)
except Exception as e:
log.warning("Ошибка получения позиции окна (Windows): %s", e)
return None
def get_window_rect_linux(title):
"""Позиция и размер окна (Linux, wmctrl -l -G). Возвращает (left, top, width, height) или None."""
try:
out = subprocess.run(
["wmctrl", "-l", "-G"],
capture_output=True,
text=True,
timeout=5,
)
if out.returncode != 0:
return None
for line in out.stdout.strip().splitlines():
# wmctrl -l -G: id desktop pid x y w h host title
parts = line.split(None, 7)
if len(parts) < 8:
continue
win_title = parts[7]
if title.lower() in win_title.lower():
x, y, w, h = int(parts[2]), int(parts[3]), int(parts[4]), int(parts[5])
return (x, y, w, h)
return None
except Exception as e:
log.warning("Ошибка получения позиции окна (Linux): %s", e)
return None
def focus_window(title, pid=None):
"""Активирует окно по заголовку; если не найдено и задан pid — по PID (Windows) или wmctrl (Linux)."""
if platform.system() == "Windows":
if focus_window_windows(title):
return True
if pid is not None and _focus_window_windows_by_pid(pid):
log.info("Окно активировано по PID процесса (заголовок не совпал).")
return True
return False
if focus_window_linux(title):
return True
if pid is not None and _focus_window_linux_by_pid(pid):
log.info("Окно активировано по PID процесса (заголовок не совпал).")
return True
return False
def get_window_rect(title, pid=None):
"""Позиция и размер окна по заголовку; если не найдено и задан pid — по PID (Windows/Linux)."""
if platform.system() == "Windows":
rect = get_window_rect_windows(title)
if rect is None and pid is not None:
rect = _get_window_rect_windows_by_pid(pid)
return rect
rect = get_window_rect_linux(title)
if rect is None and pid is not None:
rect = _get_window_rect_linux_by_pid(pid)
return rect
def click_relative(rect, relative_x, relative_y):
"""Клик в координатах относительно окна (rect = left, top, w, h)."""
abs_x = rect[0] + relative_x
abs_y = rect[1] + relative_y
pyautogui.click(abs_x, abs_y)
def _resolve_click_position(rect, pixel_pos, rel_pos):
"""
Возвращает (abs_x, abs_y) для клика.
rect = (left, top, width, height). Если заданы rel_pos (доли 01 от размера окна) — используем их;
иначе pixel_pos (смещение в пикселях от левого верхнего угла окна).
"""
if rect is None:
if pixel_pos is None:
return None
return (pixel_pos[0], pixel_pos[1])
left, top, width, height = rect[0], rect[1], rect[2], rect[3]
if rel_pos is not None and len(rel_pos) >= 2:
try:
rx, ry = float(rel_pos[0]), float(rel_pos[1])
return (left + rx * width, top + ry * height)
except (TypeError, ValueError):
pass
if pixel_pos is not None and len(pixel_pos) >= 2:
return (left + pixel_pos[0], top + pixel_pos[1])
return None
def _resolve_image_path(path):
"""Путь к файлу: если не абсолютный — относительно папки скрипта."""
if not path:
return None
p = Path(path)
if not p.is_absolute():
p = SCRIPT_DIR / p
return p if p.exists() else None
def _find_and_click_image_opencv(template_path, confidence=0.8):
"""
Поиск по картинке через OpenCV + mss (без pyscreeze/Pillow).
Делает скриншот через mss, ищет шаблон через cv2.matchTemplate, кликает в центр.
"""
try:
import numpy as np
import cv2
import mss
except ImportError as e:
log.warning("Поиск по картинке недоступен (нужны opencv-python, mss): %s", e)
return False
template = cv2.imread(str(template_path))
if template is None:
log.warning("OpenCV не смог прочитать изображение: %s", template_path)
return False
with mss.mss() as sct:
monitor = sct.monitors[0]
screenshot = sct.grab(monitor)
img = np.array(screenshot)
img_bgr = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)
result = cv2.matchTemplate(img_bgr, template, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
if max_val < confidence:
return False
h, w = template.shape[:2]
center_x = max_loc[0] + w // 2
center_y = max_loc[1] + h // 2
pyautogui.click(center_x, center_y)
return True
def find_and_click_image(image_path, confidence=0.8, timeout=10, retries=3):
"""
Ищет изображение на экране, кликает в центр.
Сначала пробует pyautogui.locateOnScreen (pyscreeze + Pillow);
при ошибке импорта — резервно OpenCV + mss.
"""
resolved = _resolve_image_path(image_path)
if not resolved:
log.warning("Файл изображения не найден: %s", image_path)
return False
path_str = str(resolved)
use_opencv_fallback = False
for attempt in range(retries):
if not use_opencv_fallback:
try:
loc = pyautogui.locateOnScreen(path_str, confidence=confidence)
if loc is not None:
x = loc.left + loc.width // 2
y = loc.top + loc.height // 2
pyautogui.click(x, y)
return True
except TypeError:
try:
loc = pyautogui.locateOnScreen(path_str)
if loc is not None:
x = loc.left + loc.width // 2
y = loc.top + loc.height // 2
pyautogui.click(x, y)
return True
except Exception:
pass
use_opencv_fallback = True
except Exception as e:
err_text = str(e).lower()
if "pyscreeze" in err_text or "pillow" in err_text or "unable to import" in err_text:
use_opencv_fallback = True
else:
log.warning("Поиск по картинке %s (попытка %s): %s", image_path, attempt + 1, e)
if use_opencv_fallback:
if _find_and_click_image_opencv(resolved, confidence=confidence):
return True
if attempt < retries - 1:
time.sleep(timeout / retries)
continue
if attempt < retries - 1:
time.sleep(timeout / retries)
if use_opencv_fallback:
log.warning("Изображение на экране не найдено (OpenCV): %s", image_path)
else:
log.warning("Изображение на экране не найдено: %s", image_path)
return False
def set_status(config, pid=None):
"""Выполняет последовательность: фокус окна, клики, очистка, ввод текста, сохранить. pid — для резервного поиска окна по процессу."""
title = config["window_title"]
message = config["status_message"]
confidence = float(config.get("image_confidence", 0.8))
if not focus_window(title, pid=pid):
log.error("Не удалось активировать окно: %s", title)
return False
time.sleep(0.7)
rect = get_window_rect(title, pid=pid)
use_icon_img = config.get("icon_image")
use_status_img = config.get("status_image")
use_save_img = config.get("save_button_image")
# Иконка
if use_icon_img:
if not find_and_click_image(use_icon_img, confidence=confidence):
log.error("Не найдена картинка иконки: %s", use_icon_img)
return False
else:
icon_xy = _resolve_click_position(
rect, config.get("icon_position"), config.get("icon_position_rel")
)
if icon_xy is None:
log.error("Не заданы координаты иконки (icon_image или icon_position / icon_position_rel).")
return False
if not rect:
log.warning("Позиция окна не получена; координаты считаются абсолютными.")
pyautogui.click(icon_xy[0], icon_xy[1])
time.sleep(0.4)
# Поле статуса
if use_status_img:
if not find_and_click_image(use_status_img, confidence=confidence):
log.error("Не найдена картинка поля статуса: %s", use_status_img)
return False
else:
status_xy = _resolve_click_position(
rect, config.get("status_position"), config.get("status_position_rel")
)
if status_xy is None:
log.error("Не заданы координаты поля статуса (status_image или status_position / status_position_rel).")
return False
pyautogui.click(status_xy[0], status_xy[1])
time.sleep(0.5)
# Выделить всё и удалить (очистка текущего статуса)
pyautogui.hotkey("ctrl", "a")
time.sleep(0.3)
pyautogui.press("delete")
time.sleep(0.3)
# Ввод нового статуса (через буфер обмена)
pyperclip.copy(message)
pyautogui.hotkey("ctrl", "v")
time.sleep(0.3)
# Кнопка «Сохранить»
if use_save_img:
if not find_and_click_image(use_save_img, confidence=confidence):
log.error("Не найдена картинка кнопки «Сохранить»: %s", use_save_img)
return False
else:
save_xy = _resolve_click_position(
rect, config.get("save_button_position"), config.get("save_button_position_rel")
)
if save_xy is None:
log.error("Не заданы координаты кнопки сохранения (save_button_image или save_button_position / save_button_position_rel).")
return False
pyautogui.click(save_xy[0], save_xy[1])
log.info("Статус установлен: %s", message)
return True
def main():
config, ip_mask = load_config()
if config is None:
log.info("Текущий IP не совпадает ни с одной маской из конфига — выход без действий.")
return 0
process_name = config["process_name"]
startup_delay = config["startup_delay"]
log.info("IP совпадает с маской %s, ожидание процесса %s...", ip_mask, process_name)
pid = wait_for_process(process_name)
log.info("Процесс запущен (PID %s), ожидание %s сек...", pid, startup_delay)
time.sleep(startup_delay)
set_status(config, pid=pid)
return 0
if __name__ == "__main__":
sys.exit(main())