Commit 16fbbe53 authored by Дмитрий Никулин's avatar Дмитрий Никулин Committed by Никита Ефремов

Add support for getting last system update dates

parent 399a4ffe
import os import os
import errno
import re import re
from datetime import datetime
from settingsd import const from settingsd import const
from settingsd import config from settingsd import config
from settingsd import service from settingsd import service
...@@ -10,8 +12,18 @@ from settingsd.tools.process import execProcess ...@@ -10,8 +12,18 @@ from settingsd.tools.process import execProcess
INTERFACE_NAME = "packageUpdates" INTERFACE_NAME = "packageUpdates"
SERVICE_NAME = "package_updates" SERVICE_NAME = "package_updates"
APT_EXECUTABLE = "/usr/bin/apt" APT_EXECUTABLE = "/usr/bin/apt"
APT_COMMANDS = (
'install', 'upgrade', 'update', 'remove', 'purge', 'dist-upgrade', 'search',
'autoremove', 'dselect-upgrade', 'check', 'clean', 'autoclean', 'source',
'download', 'changelog', 'build-dep', 'full-upgrade', 'show', 'list',
'edit-sources'
)
PACKAGE_REGEX = re.compile(r'^(\S+)/\S+') PACKAGE_REGEX = re.compile(r'^(\S+)/\S+')
MULTIPLE_SPACES_REGEX = re.compile(r'\s{2,}')
HISTORY_BLOCK_SIZE = 2 ** 16 # bytes
class PackageUpdates(service.FunctionObject) : class PackageUpdates(service.FunctionObject) :
@service.functionMethod(INTERFACE_NAME, in_signature="", out_signature="as") @service.functionMethod(INTERFACE_NAME, in_signature="", out_signature="as")
...@@ -19,6 +31,24 @@ class PackageUpdates(service.FunctionObject) : ...@@ -19,6 +31,24 @@ class PackageUpdates(service.FunctionObject) :
execProcess([APT_EXECUTABLE, 'update']) execProcess([APT_EXECUTABLE, 'update'])
raw_output, = execProcess([APT_EXECUTABLE, 'list', '--upgradable'])[:1] raw_output, = execProcess([APT_EXECUTABLE, 'list', '--upgradable'])[:1]
return self._extract_upgradable_packages(raw_output.decode('utf-8')) return self._extract_upgradable_packages(raw_output.decode('utf-8'))
@service.functionMethod(INTERFACE_NAME, in_signature="", out_signature="i")
def get_last_update_date(self):
with open('/var/log/apt/history.log', 'r') as apt_history_file:
apt_history_file.seek(0, os.SEEK_END)
file_size = apt_history_file.tell()
full_block_count = file_size // HISTORY_BLOCK_SIZE
block = 0
while True:
offset = 0 if block >= full_block_count else file_size - HISTORY_BLOCK_SIZE * (block + 1)
result = self._try_extract_upgrade_date(apt_history_file, offset, file_size)
if result:
return result.timestamp()
if block < full_block_count:
block += 1
else:
return 0
@service.functionMethod(INTERFACE_NAME, in_signature="", out_signature="") @service.functionMethod(INTERFACE_NAME, in_signature="", out_signature="")
def install_updates(self): def install_updates(self):
...@@ -29,6 +59,53 @@ class PackageUpdates(service.FunctionObject) : ...@@ -29,6 +59,53 @@ class PackageUpdates(service.FunctionObject) :
matches = [PACKAGE_REGEX.match(line) for line in lines] matches = [PACKAGE_REGEX.match(line) for line in lines]
return [m[1] for m in matches if m] return [m[1] for m in matches if m]
def _parse_apt_block(self, block):
fields = {}
for line in block.split('\n'):
parts = line.split(':', 1)
if len(parts) != 2:
continue
fields[parts[0].strip()] = parts[1].strip()
return fields
def _parse_apt_operations(self, blocks):
operations = []
for block in blocks:
fields = self._parse_apt_block(block)
if all(key in fields for key in ('Start-Date', 'Commandline', 'End-Date')):
operations.append({
'startDate': self._parse_date(fields['Start-Date']),
'command': self._parse_command(fields['Commandline'])
})
return operations
def _parse_command(self, commandline):
words = [w for w in commandline.split(' ') if len(w) and not w.startswith('-')]
for word in words:
if word in APT_COMMANDS:
return word
def _parse_date(self, date_str):
date_str = MULTIPLE_SPACES_REGEX.sub(' ', date_str)
return datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
def _read_apt_blocks(self, apt_history_file, offset, block_size):
apt_history_file.seek(offset, os.SEEK_SET)
data = apt_history_file.read(block_size)
return data.split('\n\n')
def _try_extract_upgrade_date(self, apt_history_file, offset, file_size):
limit = max(file_size - offset, HISTORY_BLOCK_SIZE * 2) + 1
for block_size in range(HISTORY_BLOCK_SIZE * 2, limit, HISTORY_BLOCK_SIZE):
apt_operation_blocks = self._read_apt_blocks(apt_history_file, offset, block_size)
apt_operations = self._parse_apt_operations(apt_operation_blocks)
if len(apt_operations):
break
for operation in apt_operations[::-1]:
if operation['command'] == 'upgrade':
return operation['startDate']
class Service(service.Service) : class Service(service.Service) :
def initService(self): def initService(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment