File: //lib/python3/dist-packages/UpdateManager/Core/MetaRelease.py
# MetaRelease.py
# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*-
#
#  Copyright (c) 2004,2005 Canonical
#
#  Author: Michael Vogt <michael.vogt@ubuntu.com>
#
#  This program is free software; you can redistribute it and/or
#  modify it under the terms of the GNU General Public License as
#  published by the Free Software Foundation; either version 2 of the
#  License, or (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
#  USA
import apt
import apt_pkg
import distro_info
import configparser
from http.client import BadStatusLine
import logging
import email.utils
import os
import socket
import sys
import time
import threading
from urllib.parse import quote
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
from .utils import (
    get_lang,
    get_dist,
    get_dist_version,
    get_ubuntu_flavor,
    get_ubuntu_flavor_name,
)
class MetaReleaseParseError(Exception):
    pass
class Dist(object):
    def __init__(self, name, version, date, supported):
        self.name = name
        self.version = version
        self.date = date
        self.supported = supported
        self.releaseNotesURI = None
        self.releaseNotesHtmlUri = None
        self.upgradeTool = None
        self.upgradeToolSig = None
        # the server may report that the upgrade is broken currently
        self.upgrade_broken = None
class MetaReleaseCore(object):
    """
    A MetaReleaseCore object abstracts the list of released
    distributions.
    """
    DEBUG = "DEBUG_UPDATE_MANAGER" in os.environ
    # some constants
    CONF = "/etc/update-manager/release-upgrades"
    CONF_METARELEASE = "/etc/update-manager/meta-release"
    def __init__(
        self,
        useDevelopmentRelease=False,
        useProposed=False,
        debug=False,
        forceLTS=False,
        forceDownload=False,
        cache=None,
    ):
        if debug:
            self.DEBUG = True
        self._debug(
            "MetaRelease.__init__() useDevel=%s useProposed=%s"
            % (useDevelopmentRelease, useProposed)
        )
        # force download instead of sending if-modified-since
        self.forceDownload = forceDownload
        self.useDevelopmentRelease = useDevelopmentRelease
        # information about the available dists
        self.downloaded = threading.Event()
        self.upgradable_to = None
        self.new_dist = None
        if cache is None:
            cache = apt.Cache()
        self.flavor = get_ubuntu_flavor(cache=cache)
        self.flavor_name = get_ubuntu_flavor_name(cache=cache)
        self.current_dist_name = get_dist()
        self.current_dist_version = get_dist_version()
        self.no_longer_supported = None
        self.prompt = None
        # default (if the conf file is missing)
        base_uri = "https://changelogs.ubuntu.com/"
        self.METARELEASE_URI = base_uri + "meta-release"
        self.METARELEASE_URI_LTS = base_uri + "meta-release-lts"
        self.METARELEASE_URI_UNSTABLE_POSTFIX = "-development"
        self.METARELEASE_URI_PROPOSED_POSTFIX = "-proposed"
        # check the meta-release config first
        parser = configparser.ConfigParser()
        if os.path.exists(self.CONF_METARELEASE):
            try:
                parser.read(self.CONF_METARELEASE)
            except configparser.Error as e:
                sys.stderr.write(
                    "ERROR: failed to read '%s':\n%s"
                    % (self.CONF_METARELEASE, e)
                )
                return
            # make changing the metarelease file and the location
            # for the files easy
            if parser.has_section("METARELEASE"):
                sec = "METARELEASE"
                for k in [
                    "URI",
                    "URI_LTS",
                    "URI_UNSTABLE_POSTFIX",
                    "URI_PROPOSED_POSTFIX",
                ]:
                    if parser.has_option(sec, k):
                        self._debug(
                            "%s: %s "
                            % (self.CONF_METARELEASE, parser.get(sec, k))
                        )
                        setattr(self, "%s_%s" % (sec, k), parser.get(sec, k))
        # check the config file first to figure if we want lts upgrades only
        parser = configparser.ConfigParser()
        if os.path.exists(self.CONF):
            try:
                parser.read(self.CONF)
            except configparser.Error as e:
                sys.stderr.write(
                    "ERROR: failed to read '%s':\n%s" % (self.CONF, e)
                )
                return
            # now check which specific url to use
            if parser.has_option("DEFAULT", "Prompt"):
                prompt = parser.get("DEFAULT", "Prompt").lower()
                if prompt == "never" or prompt == "no":
                    self.prompt = "never"
                    # nothing to do for this object
                    # FIXME: what about no longer supported?
                    self.downloaded.set()
                    return
                elif prompt == "lts":
                    self.prompt = "lts"
                    # the Prompt=lts setting only makes sense when running on
                    # a LTS, otherwise it would result in users not receiving
                    # any distro upgrades
                    di = distro_info.UbuntuDistroInfo()
                    if di.is_lts(self.current_dist_name):
                        self.METARELEASE_URI = self.METARELEASE_URI_LTS
                    else:
                        self._debug("Prompt=lts for non-LTS, ignoring")
                else:
                    self.prompt = "normal"
        # needed for the _tryUpgradeSelf() code in DistUpgradeController
        if forceLTS:
            self.METARELEASE_URI = self.METARELEASE_URI_LTS
        # devel and proposed "just" change the postfix
        if useDevelopmentRelease:
            self.METARELEASE_URI += self.METARELEASE_URI_UNSTABLE_POSTFIX
        elif useProposed:
            self.METARELEASE_URI += self.METARELEASE_URI_PROPOSED_POSTFIX
        self._debug("metarelease-uri: %s" % self.METARELEASE_URI)
        self.metarelease_information = None
        if not self._buildMetaReleaseFile():
            self._debug("_buildMetaReleaseFile failed")
            return
        # we start the download thread here and we have a timeout
        threading.Thread(target=self.download).start()
        # threading.Thread(target=self.check).start()
    def _buildMetaReleaseFile(self):
        # build the metarelease_file name
        self.METARELEASE_FILE = os.path.join(
            "/var/lib/update-manager/", os.path.basename(self.METARELEASE_URI)
        )
        # check if we can write to the global location, if not,
        # write to homedir
        try:
            open(self.METARELEASE_FILE, "a").close()
        except IOError:
            cache_dir = os.getenv(
                "XDG_CACHE_HOME", os.path.expanduser("~/.cache")
            )
            # Take special care when creating this directory; ~/.cache needs
            # to be created with mode 0700, but the other directories do
            # not.
            cache_parent_dir = os.path.split(cache_dir)[0]
            if not os.path.exists(cache_parent_dir):
                try:
                    os.makedirs(cache_parent_dir)
                except OSError as e:
                    sys.stderr.write("mkdir() failed: '%s'" % e)
                    return False
            if not os.path.exists(cache_dir):
                try:
                    os.mkdir(cache_dir, 0o700)
                except OSError as e:
                    sys.stderr.write("mkdir() failed: '%s'" % e)
                    return False
            path = os.path.join(cache_dir, "update-manager-core")
            if not os.path.exists(path):
                try:
                    os.mkdir(path)
                except OSError as e:
                    sys.stderr.write("mkdir() failed: '%s'" % e)
                    return False
            self.METARELEASE_FILE = os.path.join(
                path, os.path.basename(self.METARELEASE_URI)
            )
        # if it is empty, remove it to avoid I-M-S hits on empty file
        try:
            if os.path.getsize(self.METARELEASE_FILE) == 0:
                os.unlink(self.METARELEASE_FILE)
        except Exception:
            pass
        return True
    def dist_no_longer_supported(self, dist):
        """virtual function that is called when the distro is no longer
        supported
        """
        self.no_longer_supported = dist
    def new_dist_available(self, dist):
        """virtual function that is called when a new distro release
        is available
        """
        self.new_dist = dist
    def parse(self):
        self._debug("MetaRelease.parse()")
        current_dist_name = self.current_dist_name
        self._debug("current dist name: '%s'" % current_dist_name)
        current_dist = None
        dists = []
        # parse the metarelease_information file
        index_tag = apt_pkg.TagFile(self.metarelease_information)
        try:
            while index_tag.step():
                for required_key in ("Dist", "Version", "Supported", "Date"):
                    if required_key not in index_tag.section:
                        raise MetaReleaseParseError(
                            "Required key '%s' missing" % required_key
                        )
                name = index_tag.section["Dist"]
                self._debug("found distro name: '%s'" % name)
                rawdate = index_tag.section["Date"]
                parseddate = list(email.utils.parsedate(rawdate))
                parseddate[8] = 0  # assume no DST
                date = time.mktime(tuple(parseddate))
                supported = int(index_tag.section["Supported"])
                version = index_tag.section["Version"]
                # add the information to a new date object
                dist = Dist(name, version, date, supported)
                if "ReleaseNotes" in index_tag.section:
                    dist.releaseNotesURI = index_tag.section["ReleaseNotes"]
                    lang = get_lang()
                    if lang:
                        dist.releaseNotesURI += "?lang=%s" % lang
                if "ReleaseNotesHtml" in index_tag.section:
                    dist.releaseNotesHtmlUri = index_tag.section[
                        "ReleaseNotesHtml"
                    ]
                    query = self._get_release_notes_uri_query_string(dist)
                    if query:
                        dist.releaseNotesHtmlUri += query
                if "UpgradeTool" in index_tag.section:
                    dist.upgradeTool = index_tag.section["UpgradeTool"]
                if "UpgradeToolSignature" in index_tag.section:
                    dist.upgradeToolSig = index_tag.section[
                        "UpgradeToolSignature"
                    ]
                if "UpgradeBroken" in index_tag.section:
                    dist.upgrade_broken = index_tag.section["UpgradeBroken"]
                dists.append(dist)
                if name == current_dist_name:
                    current_dist = dist
        except apt_pkg.Error:
            raise MetaReleaseParseError(
                "Unable to parse %s" % self.METARELEASE_URI
            )
        self.metarelease_information.close()
        self.metarelease_information = None
        # first check if the current runing distro is in the meta-release
        # information. if not, we assume that we run on something not
        # supported and silently return
        if current_dist is None:
            self._debug("current dist not found in meta-release file\n")
            return False
        # then see what we can upgrade to
        upgradable_to = ""
        for dist in dists:
            if dist.date > current_dist.date:
                # Only offer to upgrade to an unsupported release if running
                # with useDevelopmentRelease, this way one can upgrade from an
                # LTS release to the next supported non-LTS release e.g. from
                # 14.04 to 15.04.
                if not dist.supported and not self.useDevelopmentRelease:
                    continue
                upgradable_to = dist
                self._debug("new dist: %s" % upgradable_to)
                break
        # only warn if unsupported and a new dist is available (because
        # the development version is also unsupported)
        if upgradable_to != "" and not current_dist.supported:
            self.upgradable_to = upgradable_to
            self.dist_no_longer_supported(current_dist)
        if upgradable_to != "":
            self.upgradable_to = upgradable_to
            self.new_dist_available(upgradable_to)
        # parsing done and sucessfully
        return True
    # the network thread that tries to fetch the meta-index file
    # can't touch the gui, runs as a thread
    def download(self):
        self._debug("MetaRelease.download()")
        lastmodified = 0
        req = Request(self.METARELEASE_URI)
        # make sure that we always get the latest file (#107716)
        req.add_header("Cache-Control", "No-Cache")
        req.add_header("Pragma", "no-cache")
        if os.access(self.METARELEASE_FILE, os.W_OK):
            try:
                lastmodified = os.stat(self.METARELEASE_FILE).st_mtime
            except OSError:
                pass
        if lastmodified > 0 and not self.forceDownload:
            req.add_header(
                "If-Modified-Since", time.asctime(time.gmtime(lastmodified))
            )
        try:
            # open
            uri = urlopen(req, timeout=20)
            # sometime there is a root owned meta-relase file
            # there, try to remove it so that we get it
            # with proper permissions
            if os.path.exists(self.METARELEASE_FILE) and not os.access(
                self.METARELEASE_FILE, os.W_OK
            ):
                try:
                    os.unlink(self.METARELEASE_FILE)
                except OSError as e:
                    print(
                        "Can't unlink '%s' (%s)" % (self.METARELEASE_FILE, e)
                    )
            # we may get exception here on e.g. disk full
            try:
                f = open(self.METARELEASE_FILE, "w+")
                for line in uri.readlines():
                    f.write(line.decode("UTF-8"))
                f.flush()
                f.seek(0, 0)
                self.metarelease_information = f
            except IOError:
                pass
            uri.close()
        # http error
        except HTTPError as e:
            # mvo: only reuse local info on "not-modified"
            if e.code == 304 and os.path.exists(self.METARELEASE_FILE):
                self._debug("reading file '%s'" % self.METARELEASE_FILE)
                self.metarelease_information = open(self.METARELEASE_FILE, "r")
            else:
                self._debug("result of meta-release download: '%s'" % e)
        # generic network error
        except (URLError, BadStatusLine, socket.timeout) as e:
            self._debug("result of meta-release download: '%s'" % e)
            print(
                "Failed to connect to %s. Check your Internet connection "
                "or proxy settings" % self.METARELEASE_URI
            )
        # now check the information we have
        if self.metarelease_information is not None:
            self._debug("have self.metarelease_information")
            try:
                self.parse()
            except Exception:
                logging.exception(
                    "parse failed for '%s'" % self.METARELEASE_FILE
                )
                # no use keeping a broken file around
                os.remove(self.METARELEASE_FILE)
            # we don't want to keep a meta-release file around when it
            # has a "Broken" flag, this ensures we are not bitten by
            # I-M-S/cache issues
            if self.new_dist and self.new_dist.upgrade_broken:
                os.remove(self.METARELEASE_FILE)
        else:
            self._debug("NO self.metarelease_information")
        self.downloaded.set()
    @property
    def downloading(self):
        return not self.downloaded.is_set()
    def _get_release_notes_uri_query_string(self, dist):
        q = "?"
        # get the lang
        lang = get_lang()
        if lang:
            q += "lang=%s&" % lang
        # get the os
        q += "os=%s&" % self.flavor
        # get the version to upgrade to
        q += "ver=%s" % dist.version
        # the archive didn't respond well to ? being %3F
        return quote(q, "/?")
    def _debug(self, msg):
        if self.DEBUG:
            sys.stderr.write(msg + "\n")
if __name__ == "__main__":
    meta = MetaReleaseCore(False, False)