Source code for httpserver

# -*- coding: utf-8 -*-

##########################################################################
# OpenWebif: httpserver
##########################################################################
# Copyright (C) 2011 - 2020 E2OpenPlugins
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
##########################################################################

from __future__ import print_function
import enigma
from Screens.MessageBox import MessageBox
from Components.config import config
from Tools.Directories import fileExists
from twisted import version
from twisted.internet import reactor, ssl
from twisted.web import server, http, resource
from twisted.internet.error import CannotListenError

from Plugins.Extensions.OpenWebif.controllers.root import RootController
from Plugins.Extensions.OpenWebif.sslcertificate import SSLCertificateGenerator, KEY_FILE, CERT_FILE, CA_FILE, CHAIN_FILE
from socket import has_ipv6
from OpenSSL import SSL
from OpenSSL import crypto
from Components.Network import iNetwork

import os
import imp
import ipaddress
import six

global listener, server_to_stop, site, sslsite
listener = []


def getAllNetworks():
	tempaddrs = []
	# Get all IP networks
	if fileExists('/proc/net/if_inet6'):
		if has_ipv6 and version.major >= 12:
			proc = '/proc/net/if_inet6'
			for line in open(proc).readlines():
				# Skip localhost
				if line.startswith('00000000000000000000000000000001'):
					continue

				tmp = line.split()
				tmpaddr = str(ipaddress.ip_address(int(tmp[0], 16)))
				if tmp[2].lower() != "ff":
					tmpaddr = "%s/%s" % (tmpaddr, int(tmp[2].lower(), 16))
					tmpaddr = str(ipaddress.IPv6Network(six.text_type(tmpaddr), strict=False))

				tempaddrs.append(tmpaddr)
	# Crappy legacy IPv4 has no proc entry with clean addresses
	ifaces = iNetwork.getConfiguredAdapters()
	for iface in ifaces:
		# IPv4 and old fashioned netmask are served as silly arrays
		crap = iNetwork.getAdapterAttribute(iface, "ip")
		if not crap or len(crap) != 4:
			continue
		ip = '.'.join(str(x) for x in crap)
		netmask = str(sum([bin(int(x)).count('1') for x in iNetwork.getAdapterAttribute(iface, "netmask")]))
		ip = ip + "/" + netmask
		tmpaddr = str(ipaddress.IPv4Network(six.text_type(ip), strict=False))
		tempaddrs.append(tmpaddr)

	if tempaddrs == []:
		return None
	else:
		return tempaddrs


def verifyCallback(connection, x509, errnum, errdepth, ok):
	if not ok:
		print('[OpenWebif] Invalid cert from subject: ', x509.get_subject())
		return False
	else:
		print('[OpenWebif] Successful cert authed as: ', x509.get_subject())
	return True


def isOriginalWebifInstalled():
	pluginpath = enigma.eEnv.resolve('${libdir}/enigma2/python/Plugins/Extensions/WebInterface/plugin.py')
	if fileExists(pluginpath) or fileExists(pluginpath + "o") or fileExists(pluginpath + "c"):
		return True

	return False


def buildRootTree(session):
	root = RootController(session)

	if not isOriginalWebifInstalled():
		# this is an hack! any better idea?
		origwebifpath = enigma.eEnv.resolve('${libdir}/enigma2/python/Plugins/Extensions/WebInterface')
		hookpath = enigma.eEnv.resolve('${libdir}/enigma2/python/Plugins/Extensions/OpenWebif/pluginshook.src')
		if not os.path.islink(origwebifpath + "/WebChilds/Toplevel.py"):
			print("[OpenWebif] hooking original webif plugins")

			cleanuplist = [
				"/__init__.py",
				"/__init__.pyo",
				"/__init__.pyc",
				"/WebChilds/__init__.py",
				"/WebChilds/__init__.pyo",
				"/WebChilds/__init__.pyc",
				"/WebChilds/External/__init__.py",
				"/WebChilds/External/__init__.pyo",
				"/WebChilds/External/__init__.pyc",
				"/WebChilds/Toplevel.py",
				"/WebChilds/Toplevel.pyo"
				"/WebChilds/Toplevel.pyc"
			]

			for cleanupfile in cleanuplist:
				if fileExists(origwebifpath + cleanupfile):
					os.remove(origwebifpath + cleanupfile)

			if not os.path.exists(origwebifpath + "/WebChilds/External"):
				os.makedirs(origwebifpath + "/WebChilds/External")
			open(origwebifpath + "/__init__.py", "w").close()
			open(origwebifpath + "/WebChilds/__init__.py", "w").close()
			open(origwebifpath + "/WebChilds/External/__init__.py", "w").close()

			os.symlink(hookpath, origwebifpath + "/WebChilds/Toplevel.py")

		# import modules
		print("[OpenWebif] loading external plugins...")
		from Plugins.Extensions.WebInterface.WebChilds.Toplevel import loaded_plugins
		if len(loaded_plugins) == 0:
			externals = os.listdir(origwebifpath + "/WebChilds/External")
			loaded = []
			for external in externals:
				if external[-3:] == ".py":
					modulename = external[:-3]
				elif external[-4:] == ".pyo" or external[-4:] == ".pyc":
					modulename = external[:-4]
				else:
					continue

				if modulename == "__init__":
					continue

				if modulename in loaded:
					continue

				loaded.append(modulename)
				try:
					imp.load_source(modulename, origwebifpath + "/WebChilds/External/" + modulename + ".py")
				except Exception as e:
					# maybe there's only the compiled version
					imp.load_compiled(modulename, origwebifpath + "/WebChilds/External/" + external)

		if len(loaded_plugins) > 0:
			for plugin in loaded_plugins:
				root.putChild2(plugin[0], plugin[1])
				print("[OpenWebif] plugin '%s' loaded on path '/%s'" % (plugin[2], plugin[0]))
		else:
			print("[OpenWebif] no plugins to load")
	return root


[docs]def HttpdStart(session): """ Helper class to start web server Args: session: (?) session object """ if config.OpenWebif.enabled.value is True: global listener, site, sslsite port = config.OpenWebif.port.value if listener != None and len(listener) > 0: print("[OpenWebif] httpserver already started") return temproot = buildRootTree(session) root = AuthResource(session, temproot) site = server.Site(root) site.displayTracebacks = config.OpenWebif.displayTracebacks.value # start http webserver on configured port try: if has_ipv6 and fileExists('/proc/net/if_inet6') and version.major >= 12: # use ipv6 listener.append(reactor.listenTCP(port, site, interface='::')) else: # ipv4 only listener.append(reactor.listenTCP(port, site)) print("[OpenWebif] started on %i" % (port)) BJregisterService('http', port) except CannotListenError: print("[OpenWebif] failed to listen on Port %i" % (port)) if config.OpenWebif.https_clientcert.value is True and not os.path.exists(CA_FILE): # Disable https config.OpenWebif.https_enabled.value = False config.OpenWebif.https_enabled.save() # Inform the user session.open(MessageBox, "Cannot read CA certs for HTTPS access\nHTTPS access is disabled!", MessageBox.TYPE_ERROR) if config.OpenWebif.https_enabled.value is True: httpsPort = config.OpenWebif.https_port.value installCertificates(session) # start https webserver on port configured port try: try: key = crypto.load_privatekey(crypto.FILETYPE_PEM, open(KEY_FILE, 'rt').read()) cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(CERT_FILE, 'rt').read()) print("[OpenWebif] CHAIN_FILE = %s" % CHAIN_FILE) chain = None if os.path.exists(CHAIN_FILE): chain = [crypto.load_certificate(crypto.FILETYPE_PEM, open(CHAIN_FILE, 'rt').read())] print("[OpenWebif] ssl chain file found - loading") context = ssl.CertificateOptions(privateKey=key, certificate=cert, extraCertChain=chain) except: # nosec # noqa: E722 # THIS EXCEPTION IS ONLY CATCHED WHEN CERT FILES ARE BAD (look below for error) print("[OpenWebif] failed to get valid cert files. (It could occure bad file save or format, removing...)") # removing bad files if os.path.exists(KEY_FILE): os.remove(KEY_FILE) if os.path.exists(CERT_FILE): os.remove(CERT_FILE) # regenerate new ones installCertificates(session) context = ssl.DefaultOpenSSLContextFactory(KEY_FILE, CERT_FILE) if config.OpenWebif.https_clientcert.value is True: ctx = context.getContext() ctx.set_verify( SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, verifyCallback ) ctx.load_verify_locations(CA_FILE) sslroot = AuthResource(session, temproot) sslsite = server.Site(sslroot) if has_ipv6 and fileExists('/proc/net/if_inet6') and version.major >= 12: # use ipv6 listener.append(reactor.listenSSL(httpsPort, sslsite, context, interface='::')) else: # ipv4 only listener.append(reactor.listenSSL(httpsPort, sslsite, context)) print("[OpenWebif] started on", httpsPort) BJregisterService('https', httpsPort) except CannotListenError: print("[OpenWebif] failed to listen on Port", httpsPort) except: # nosec # noqa: E722 print("[OpenWebif] failed to start https, disabling...") # Disable https config.OpenWebif.https_enabled.value = False config.OpenWebif.https_enabled.save() # Streaming requires listening on 127.0.0.1:80 if port != 80: try: if has_ipv6 and fileExists('/proc/net/if_inet6') and version.major >= 12: # use ipv6 # Dear Twisted devs: Learning English, lesson 1 - interface != address listener.append(reactor.listenTCP(80, site, interface='::1')) listener.append(reactor.listenTCP(80, site, interface='::ffff:127.0.0.1')) else: # ipv4 only listener.append(reactor.listenTCP(80, site, interface='127.0.0.1')) print("[OpenWebif] started stream listening on port 80") except CannotListenError: print("[OpenWebif] port 80 busy")
def HttpdStop(session): StopServer(session).doStop() def HttpdRestart(session): StopServer(session, HttpdStart).doStop() class AuthResource(resource.Resource): def __init__(self, session, root): resource.Resource.__init__(self) self.resource = root def noShell(self, user): if fileExists('/etc/passwd'): for line in open('/etc/passwd').readlines(): line = line.strip() if line.startswith(user + ":") and (line.endswith(":/bin/false") or line.endswith(":/sbin/nologin")): return True return False def render(self, request): host = request.getHost().host peer = request.getClientIP() if peer is None: peer = request.transport.socket.getpeername()[0] if peer.startswith("::ffff:"): peer = peer.replace("::ffff:", "") if peer.startswith("fe80::") and "%" in peer: peer = peer.split("%")[0] if self.login(request.getUser(), request.getPassword(), peer) is False: request.setHeader('WWW-authenticate', 'Basic realm="%s"' % ("OpenWebif")) errpage = resource.ErrorPage(http.UNAUTHORIZED, "Unauthorized", "401 Authentication required") return errpage.render(request) else: return self.resource.render(request) def getChildWithDefault(self, path, request): global site, sslsite session = request.getSession().sessionNamespaces host = request.getHost().host peer = request.getClientIP() host = six.ensure_str(host) if request.getHeader("x-forwarded-for"): peer = request.getHeader("x-forwarded-for") if peer is None: peer = request.transport.socket.getpeername()[0] peer = six.ensure_str(peer) if peer.startswith("::ffff:"): peer = peer.replace("::ffff:", "") if peer.startswith("fe80::") and "%" in peer: peer = peer.split("%")[0] # Handle all conditions where auth may be skipped/disabled # #1: Auth is disabled and access is from local network if (not request.isSecure() and config.OpenWebif.auth.value is False) or (request.isSecure() and config.OpenWebif.https_auth.value is False): networks = getAllNetworks() if networks: for network in networks: if ipaddress.ip_address(six.text_type(peer)) in ipaddress.ip_network(six.text_type(network), strict=False): return self.resource.getChildWithDefault(path, request) # #2: Auth is disabled and access is from private address space (Usually VPN) and access for VPNs has been granted if (not request.isSecure() and config.OpenWebif.auth.value is False) or (request.isSecure() and config.OpenWebif.https_auth.value is False): if config.OpenWebif.vpn_access.value is True and ipaddress.ip_address(six.text_type(peer)).is_private: return self.resource.getChildWithDefault(path, request) # #3: Access is from localhost and streaming auth is disabled - or - we only want to see our IPv6 (For inadyn-mt) if ((host == "localhost" or host == "127.0.0.1" or host == "::ffff:127.0.0.1" or host == "::1") and not (request.uri.startswith(b"/web/stream?StreamService=") and config.OpenWebif.auth_for_streaming.value) or request.uri == b"/web/getipv6"): return self.resource.getChildWithDefault(path, request) # #4: Web TV is accessing streams and "auths" by parent session id ruser = six.ensure_str(request.getUser()) rpw = six.ensure_str(request.getPassword()) if ruser == "-sid": sid = str(rpw) try: oldsession = site.getSession(sid).sessionNamespaces if "logged" in list(oldsession.keys()) and oldsession["logged"]: session = request.getSession().sessionNamespaces session["logged"] = True return self.resource.getChildWithDefault(path, request) except: # nosec # noqa: E722 pass try: oldsession = sslsite.getSession(sid).sessionNamespaces if "logged" in list(oldsession.keys()) and oldsession["logged"]: session = request.getSession().sessionNamespaces session["logged"] = True return self.resource.getChildWithDefault(path, request) except: # nosec # noqa: E722 pass # If we get to here, no exception applied # Either block with forbidden (If auth is disabled) ... if (not request.isSecure() and config.OpenWebif.auth.value is False) or (request.isSecure() and config.OpenWebif.https_auth.value is False): return resource.ErrorPage(http.FORBIDDEN, 'Forbidden', '403.6 IP address rejected') # ... or auth if "logged" in list(session.keys()) and session["logged"]: return self.resource.getChildWithDefault(path, request) if self.login(ruser, rpw, peer) is False: request.setHeader('WWW-authenticate', 'Basic realm="%s"' % ("OpenWebif")) return resource.ErrorPage(http.UNAUTHORIZED, "Unauthorized", "401 Authentication required") else: session["logged"] = True session["user"] = ruser session["pwd"] = None if self.noShell(ruser): session["pwd"] = rpw return self.resource.getChildWithDefault(path, request) def login(self, user, passwd, peer): if user == "root" and config.OpenWebif.no_root_access.value: # Override "no root" for logins from local/private networks samenet = False networks = getAllNetworks() if networks: for network in networks: if ipaddress.ip_address(six.text_type(peer)) in ipaddress.ip_network(six.text_type(network), strict=False): samenet = True if not (ipaddress.ip_address(six.text_type(peer)).is_private or samenet): return False from crypt import crypt from pwd import getpwnam from spwd import getspnam cpass = None try: cpass = getpwnam(user)[1] except: # nosec # noqa: E722 return False if cpass: if cpass == 'x' or cpass == '*': try: cpass = getspnam(user)[1] except: # nosec # noqa: E722 return False return crypt(passwd, cpass) == cpass return False
[docs]class StopServer: """ Helper class to stop running web servers; we use a class here to reduce use of global variables. Resembles code prior found in HttpdStop et. al. """ server_to_stop = 0 def __init__(self, session, callback=None): self.session = session self.callback = callback def doStop(self): global listener self.server_to_stop = 0 for interface in listener: print("[OpenWebif] Stopping server on port", interface.port) deferred = interface.stopListening() try: self.server_to_stop += 1 deferred.addCallback(self.callbackStopped) except AttributeError: pass listener = [] if self.server_to_stop < 1: self.doCallback() def callbackStopped(self, reason): self.server_to_stop -= 1 if self.server_to_stop < 1: self.doCallback() def doCallback(self): if self.callback is not None: self.callback(self.session)
# # create a self signed SSL certificate if necessary # def installCertificates(session): certGenerator = SSLCertificateGenerator() try: certGenerator.installCertificates() except IOError as e: # Disable https config.OpenWebif.https_enabled.value = False config.OpenWebif.https_enabled.save() # Inform the user session.open(MessageBox, "Cannot install generated SSL-Certifactes for https access\nHttps access is disabled!", MessageBox.TYPE_ERROR) # BJ def BJregisterService(protocol, port): try: from Plugins.Extensions.Bonjour.Bonjour import bonjour service = bonjour.buildService(protocol, port, 'OpenWebif') bonjour.registerService(service, True) except: # nosec # noqa: E722 pass try: servicetype = '_' + protocol + '._tcp' enigma.e2avahi_announce(None, servicetype, port) except: # nosec # noqa: E722 pass