File: //sbin/imunify360-watchdog
#!/opt/imunify360/venv/bin/python3
import argparse
import json
import logging
import logging.handlers
import os.path
import shutil
import socket
import subprocess
import time
from pathlib import Path
from typing import Optional
from defence360agent import sentry
from defence360agent.contracts.config import GENERIC_SENSOR_SOCKET_PATH
from defence360agent.utils import is_centos6_or_cloudlinux6
logging.raiseExceptions = False
CONNECT_TIMEOUT = 10
REQUEST_TIMEOUT = 60
RETRY_DELAY = 10
MIGRATION_TIMEOUT = 4 * 60 * 60 # 4 hours
IMUNIFY360 = "imunify360"
IMUNIFY360_AGENT = "imunify360-agent"
IMUNIFY360_AGENT_SOCKET = "imunify360-agent.socket"
AGENT_SOCKET_PATH = "/var/run/defence360agent/simple_rpc.sock"
SERVICE = "service"
SUBPROCESS_TIMEOUT = 1800
RESTART = "restart"
STATUS = "status"
SHOW = "show"
AGENT_IN_MIGRATION_STATE = "Applying database migrations"
I360_GW_DIR = "/var/imunify360/gw.dir"
I360_GW_TTL_SEC = 2 * 60 * 60
def run(cmd, *, timeout=SUBPROCESS_TIMEOUT, check=False, **kwargs):
"""Run *cmd* with *timeout* without raising TimeoutExpired.
On timeout, return CompletedProcess with returncode equal to None.
"""
try:
return subprocess.run(cmd, timeout=timeout, check=check, **kwargs)
except subprocess.TimeoutExpired as e:
return subprocess.CompletedProcess(
e.cmd, returncode=None, stdout=e.stdout, stderr=e.stderr
)
def service_is_running(systemctl_exec: Optional[str], name: str) -> bool:
"""Check with help of [systemctl|service] command status of service"""
if systemctl_exec:
cmd = [systemctl_exec, STATUS, name]
else:
cmd = [SERVICE, name, STATUS]
cp = run(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return cp.returncode == 0
def restart_service(systemctl_exec: Optional[str], name: str) -> None:
"""Check with help of [systemctl|service] command status of service"""
if systemctl_exec:
cmd = [systemctl_exec, RESTART, name]
else:
cmd = [SERVICE, name, RESTART]
run(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def restart_imunify360(systemctl_exec: Optional[str]) -> None:
"""Restart resident imunify360 services"""
restart_service(systemctl_exec, IMUNIFY360)
def restart_imunify360_agent(systemctl_exec: Optional[str]) -> None:
"""Restart non-resident imunify360 services"""
restart_service(systemctl_exec, IMUNIFY360_AGENT)
def setup_logging(level) -> logging.Logger:
logger = logging.getLogger("imunify360-watchdog")
logger.setLevel(level)
handler = logging.handlers.SysLogHandler("/dev/log")
formatter = logging.Formatter("%(name)s: %(message)s")
handler.formatter = formatter
logger.addHandler(handler)
sentry.configure_sentry()
return logger
def send_to_generic_socket(_msg):
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.settimeout(CONNECT_TIMEOUT)
sock.connect(GENERIC_SENSOR_SOCKET_PATH)
msg = json.dumps(_msg).encode() + b"\n"
start_time = time.monotonic()
sock.settimeout(REQUEST_TIMEOUT)
sock.sendall(msg)
remaining_time = start_time + REQUEST_TIMEOUT - time.monotonic()
if remaining_time <= 0:
raise socket.timeout()
sock.settimeout(remaining_time)
with sock.makefile(encoding="utf-8") as file:
response = file.readline()
if not response:
raise ValueError("Empty response from socket")
return json.loads(response)
def check_agent_socket_alive(systemctl_exec: Optional[str]) -> bool:
if is_centos6_or_cloudlinux6():
return service_is_running(systemctl_exec, IMUNIFY360_AGENT)
else:
return service_is_running(systemctl_exec, IMUNIFY360_AGENT_SOCKET)
def check_outdated_gw_logs(gw_dir: Path, ttl_sec: float) -> bool:
mtime_threshold = time.time() - ttl_sec
if not gw_dir.exists():
return False
files = []
try:
for file in gw_dir.iterdir():
if file.suffix == '.log' and file.is_file() and file.stat().st_mtime < mtime_threshold:
files.append(file.name)
file.unlink()
except Exception as e:
logging.error("error iterating through log files: %s", e)
shutil.rmtree(gw_dir)
return False
if files:
logging.error("outdated files found: %s", files)
return files == []
def generic_sensor_with_retries(rpc_timeout: int) -> Optional[dict]:
start = time.time()
while True:
try:
return send_to_generic_socket(
{
"method": "HEALTH",
}
)
except Exception:
if time.time() - start >= rpc_timeout:
raise
time.sleep(RETRY_DELAY)
def systemctl_executable() -> Optional[str]:
"""Try to find systemctl in default PATH and return None if failed."""
return shutil.which("systemctl", path=os.defpath)
def service_is_migrating(systemctl_exec, name, logger):
"""
Check that service in "apply migrations" state and do not exhaust timeout
"""
if systemctl_exec:
cmd = [
systemctl_exec,
SHOW,
name,
"-p",
"StatusText",
"-p",
"ExecMainStartTimestampMonotonic",
]
else:
cmd = [SERVICE, name, SHOW]
cp = run(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
# Parse only main parameters from output, other lines ignored
params = {
key: value
for (key, value) in [
key_value.split("=", 1)
for key_value in cp.stdout.decode().splitlines()
]
if key in ["StatusText", "ExecMainStartTimestampMonotonic"]
}
if AGENT_IN_MIGRATION_STATE in params["StatusText"]:
migration_duration = (
time.monotonic()
- int(params["ExecMainStartTimestampMonotonic"]) / 1e6
)
logger.info("%s migrating for %d sec", name, migration_duration)
if migration_duration < MIGRATION_TIMEOUT:
return True
logger.error("Migration took too long")
return False
def ensure_resident_health(
logger: logging.Logger, systemctl_exec: Optional[str], rpc_timeout: int
) -> None:
try:
response = generic_sensor_with_retries(rpc_timeout)
except Exception:
logger.exception("Restarting resident service due to RPC failures")
restart_imunify360(systemctl_exec)
return
if not response.get("healthy", False):
logger.error(
"Restarting resident service due to health report: %s",
response.get("why")
if response.get("why")
else response.get("error"),
)
restart_imunify360(systemctl_exec)
return
if not check_outdated_gw_logs(Path(I360_GW_DIR), I360_GW_TTL_SEC):
logger.error("Restarting resident service due to outdated files present")
restart_imunify360(systemctl_exec)
return
logger.info("%s is healthy: %s", IMUNIFY360, response.get("why"))
def ensure_agent_health(
logger: logging.Logger, systemctl_exec: Optional[str]
) -> None:
try:
# since `service *.sock status` returns 0 even socket
# file isn't exists we need to check it manually
if (
not check_agent_socket_alive(systemctl_exec)
or not Path(AGENT_SOCKET_PATH).exists()
):
logger.exception("Restarting agent due to socket failures")
restart_imunify360_agent(systemctl_exec)
except Exception as e:
logger.exception("Restarting agent due to %s", e)
restart_imunify360_agent(systemctl_exec)
def main(rpc_timeout, log_level=logging.INFO):
logger = setup_logging(log_level)
systemctl_exec = systemctl_executable()
if not service_is_running(systemctl_exec, IMUNIFY360):
logger.info("%s is not running", IMUNIFY360)
return
elif service_is_migrating(systemctl_exec, IMUNIFY360, logger):
logger.info("%s is migrating at the moment", IMUNIFY360)
return
ensure_agent_health(logger, systemctl_exec)
ensure_resident_health(logger, systemctl_exec, rpc_timeout)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("rpc_timeout", type=int)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
main(rpc_timeout=args.rpc_timeout)