Skip to content

sgn.bottle

Bottle is a fast and simple micro-framework for small web applications. It offers request dispatching (Routes) with URL parameter support, templates, a built-in HTTP Server and adapters for many third party WSGI/HTTP-server and template engines - all in a single file and with no dependencies other than the Python Standard Library.

Homepage and documentation: http://bottlepy.org/

Copyright (c) 2009-2024, Marcel Hellkamp. License: MIT (see LICENSE for details)

AiohttpServer

Bases: AsyncioServerAdapter


              flowchart TD
              sgn.bottle.AiohttpServer[AiohttpServer]
              sgn.bottle.AsyncioServerAdapter[AsyncioServerAdapter]
              sgn.bottle.ServerAdapter[ServerAdapter]

                              sgn.bottle.AsyncioServerAdapter --> sgn.bottle.AiohttpServer
                                sgn.bottle.ServerAdapter --> sgn.bottle.AsyncioServerAdapter
                



              click sgn.bottle.AiohttpServer href "" "sgn.bottle.AiohttpServer"
              click sgn.bottle.AsyncioServerAdapter href "" "sgn.bottle.AsyncioServerAdapter"
              click sgn.bottle.ServerAdapter href "" "sgn.bottle.ServerAdapter"
            

Asynchronous HTTP client/server framework for asyncio https://pypi.python.org/pypi/aiohttp/ https://pypi.org/project/aiohttp-wsgi/

Source code in src/sgn/bottle.py
class AiohttpServer(AsyncioServerAdapter):
    """Asynchronous HTTP client/server framework for asyncio
    https://pypi.python.org/pypi/aiohttp/
    https://pypi.org/project/aiohttp-wsgi/
    """

    def get_event_loop(self):
        import asyncio

        return asyncio.new_event_loop()

    def run(self, handler):
        import asyncio

        from aiohttp_wsgi.wsgi import serve

        self.loop = self.get_event_loop()
        asyncio.set_event_loop(self.loop)

        if "BOTTLE_CHILD" in os.environ:
            import signal

            signal.signal(signal.SIGINT, lambda s, f: self.loop.stop())

        serve(handler, host=self.host, port=self.port)

AiohttpUVLoopServer

Bases: AiohttpServer


              flowchart TD
              sgn.bottle.AiohttpUVLoopServer[AiohttpUVLoopServer]
              sgn.bottle.AiohttpServer[AiohttpServer]
              sgn.bottle.AsyncioServerAdapter[AsyncioServerAdapter]
              sgn.bottle.ServerAdapter[ServerAdapter]

                              sgn.bottle.AiohttpServer --> sgn.bottle.AiohttpUVLoopServer
                                sgn.bottle.AsyncioServerAdapter --> sgn.bottle.AiohttpServer
                                sgn.bottle.ServerAdapter --> sgn.bottle.AsyncioServerAdapter
                




              click sgn.bottle.AiohttpUVLoopServer href "" "sgn.bottle.AiohttpUVLoopServer"
              click sgn.bottle.AiohttpServer href "" "sgn.bottle.AiohttpServer"
              click sgn.bottle.AsyncioServerAdapter href "" "sgn.bottle.AsyncioServerAdapter"
              click sgn.bottle.ServerAdapter href "" "sgn.bottle.ServerAdapter"
            

uvloop https://github.com/MagicStack/uvloop

Source code in src/sgn/bottle.py
class AiohttpUVLoopServer(AiohttpServer):
    """uvloop
    https://github.com/MagicStack/uvloop
    """

    def get_event_loop(self):
        import uvloop

        return uvloop.new_event_loop()

AppEngineServer

Bases: ServerAdapter


              flowchart TD
              sgn.bottle.AppEngineServer[AppEngineServer]
              sgn.bottle.ServerAdapter[ServerAdapter]

                              sgn.bottle.ServerAdapter --> sgn.bottle.AppEngineServer
                


              click sgn.bottle.AppEngineServer href "" "sgn.bottle.AppEngineServer"
              click sgn.bottle.ServerAdapter href "" "sgn.bottle.ServerAdapter"
            

Adapter for Google App Engine.

Source code in src/sgn/bottle.py
class AppEngineServer(ServerAdapter):
    """Adapter for Google App Engine."""

    quiet = True

    def run(self, handler):
        depr(
            0,
            13,
            "AppEngineServer no longer required",
            "Configure your application directly in your app.yaml",
        )
        from google.appengine.ext.webapp import util

        # A main() function in the handler script enables 'App Caching'.
        # Lets makes sure it is there. This _really_ improves performance.
        module = sys.modules.get("__main__")
        if module and not hasattr(module, "main"):
            module.main = lambda: util.run_wsgi_app(handler)
        util.run_wsgi_app(handler)

AppStack

Bases: list


              flowchart TD
              sgn.bottle.AppStack[AppStack]

              

              click sgn.bottle.AppStack href "" "sgn.bottle.AppStack"
            

A stack-like list. Calling it returns the head of the stack.

Source code in src/sgn/bottle.py
class AppStack(list):
    """A stack-like list. Calling it returns the head of the stack."""

    def __call__(self):
        """Return the current default application."""
        return self.default

    def push(self, value=None):
        """Add a new :class:`Bottle` instance to the stack"""
        if not isinstance(value, Bottle):
            value = Bottle()
        self.append(value)
        return value

    new_app = push

    @property
    def default(self):
        try:
            return self[-1]
        except IndexError:
            return self.push()

__call__()

Return the current default application.

Source code in src/sgn/bottle.py
def __call__(self):
    """Return the current default application."""
    return self.default

push(value=None)

Add a new :class:Bottle instance to the stack

Source code in src/sgn/bottle.py
def push(self, value=None):
    """Add a new :class:`Bottle` instance to the stack"""
    if not isinstance(value, Bottle):
        value = Bottle()
    self.append(value)
    return value

AsyncioServerAdapter

Bases: ServerAdapter


              flowchart TD
              sgn.bottle.AsyncioServerAdapter[AsyncioServerAdapter]
              sgn.bottle.ServerAdapter[ServerAdapter]

                              sgn.bottle.ServerAdapter --> sgn.bottle.AsyncioServerAdapter
                


              click sgn.bottle.AsyncioServerAdapter href "" "sgn.bottle.AsyncioServerAdapter"
              click sgn.bottle.ServerAdapter href "" "sgn.bottle.ServerAdapter"
            

Extend ServerAdapter for adding custom event loop

Source code in src/sgn/bottle.py
class AsyncioServerAdapter(ServerAdapter):
    """Extend ServerAdapter for adding custom event loop"""

    def get_event_loop(self):
        pass

AutoServer

Bases: ServerAdapter


              flowchart TD
              sgn.bottle.AutoServer[AutoServer]
              sgn.bottle.ServerAdapter[ServerAdapter]

                              sgn.bottle.ServerAdapter --> sgn.bottle.AutoServer
                


              click sgn.bottle.AutoServer href "" "sgn.bottle.AutoServer"
              click sgn.bottle.ServerAdapter href "" "sgn.bottle.ServerAdapter"
            

Untested.

Source code in src/sgn/bottle.py
class AutoServer(ServerAdapter):
    """Untested."""

    adapters = [
        WaitressServer,
        PasteServer,
        TwistedServer,
        CherryPyServer,
        CherootServer,
        WSGIRefServer,
    ]

    def run(self, handler):
        for sa in self.adapters:
            try:
                return sa(self.host, self.port, **self.options).run(handler)
            except ImportError:
                pass

BaseRequest

Bases: object


              flowchart TD
              sgn.bottle.BaseRequest[BaseRequest]

              

              click sgn.bottle.BaseRequest href "" "sgn.bottle.BaseRequest"
            

A wrapper for WSGI environment dictionaries that adds a lot of convenient access methods and properties. Most of them are read-only.

Adding new attributes to a request actually adds them to the environ dictionary (as 'bottle.request.ext.'). This is the recommended way to store and access request-specific data.

Source code in src/sgn/bottle.py
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
class BaseRequest(object):
    """A wrapper for WSGI environment dictionaries that adds a lot of
    convenient access methods and properties. Most of them are read-only.

    Adding new attributes to a request actually adds them to the environ
    dictionary (as 'bottle.request.ext.<name>'). This is the recommended
    way to store and access request-specific data.
    """

    __slots__ = ("environ",)

    #: Maximum size of memory buffer for :attr:`body` in bytes.
    MEMFILE_MAX = 102400

    def __init__(self, environ=None):
        """Wrap a WSGI environ dictionary."""
        #: The wrapped WSGI environ dictionary. This is the only real attribute.
        #: All other attributes actually are read-only properties.
        self.environ = {} if environ is None else environ
        self.environ["bottle.request"] = self

    @DictProperty("environ", "bottle.app", read_only=True)
    def app(self):
        """Bottle application handling this request."""
        raise RuntimeError("This request is not connected to an application.")

    @DictProperty("environ", "bottle.route", read_only=True)
    def route(self):
        """The bottle :class:`Route` object that matches this request."""
        raise RuntimeError("This request is not connected to a route.")

    @DictProperty("environ", "route.url_args", read_only=True)
    def url_args(self):
        """The arguments extracted from the URL."""
        raise RuntimeError("This request is not connected to a route.")

    @property
    def path(self):
        """The value of ``PATH_INFO`` with exactly one prefixed slash (to fix
        broken clients and avoid the "empty path" edge case)."""
        return "/" + self.environ.get("PATH_INFO", "").lstrip("/")

    @property
    def method(self):
        """The ``REQUEST_METHOD`` value as an uppercase string."""
        return self.environ.get("REQUEST_METHOD", "GET").upper()

    @DictProperty("environ", "bottle.request.headers", read_only=True)
    def headers(self):
        """A :class:`WSGIHeaderDict` that provides case-insensitive access to
        HTTP request headers."""
        return WSGIHeaderDict(self.environ)

    def get_header(self, name, default=None):
        """Return the value of a request header, or a given default value."""
        return self.headers.get(name, default)

    @DictProperty("environ", "bottle.request.cookies", read_only=True)
    def cookies(self):
        """Cookies parsed into a :class:`FormsDict`. Signed cookies are NOT
        decoded. Use :meth:`get_cookie` if you expect signed cookies."""
        cookies = SimpleCookie(self.environ.get("HTTP_COOKIE", "")).values()
        return FormsDict((c.key, c.value) for c in cookies)

    def get_cookie(self, key, default=None, secret=None, digestmod=hashlib.sha256):
        """Return the content of a cookie. To read a `Signed Cookie`, the
        `secret` must match the one used to create the cookie (see
        :meth:`Response.set_cookie <BaseResponse.set_cookie>`). If anything goes wrong (missing
        cookie or wrong signature), return a default value."""
        value = self.cookies.get(key)
        if secret:
            # See BaseResponse.set_cookie for details on signed cookies.
            if value and value.startswith("!") and "?" in value:
                sig, msg = map(tob, value[1:].split("?", 1))
                hash = hmac.new(tob(secret), msg, digestmod=digestmod).digest()
                if _lscmp(sig, base64.b64encode(hash)):
                    dst = pickle.loads(base64.b64decode(msg))
                    if dst and dst[0] == key:
                        return dst[1]
            return default
        return value or default

    @DictProperty("environ", "bottle.request.query", read_only=True)
    def query(self):
        """The :attr:`query_string` parsed into a :class:`FormsDict`. These
        values are sometimes called "URL arguments" or "GET parameters", but
        not to be confused with "URL wildcards" as they are provided by the
        :class:`Router`."""
        get = self.environ["bottle.get"] = FormsDict()
        pairs = _parse_qsl(self.environ.get("QUERY_STRING", ""))
        for key, value in pairs:
            get[key] = value
        return get

    @DictProperty("environ", "bottle.request.forms", read_only=True)
    def forms(self):
        """Form values parsed from an `url-encoded` or `multipart/form-data`
        encoded POST or PUT request body. The result is returned as a
        :class:`FormsDict`. All keys and values are strings. File uploads
        are stored separately in :attr:`files`."""
        forms = FormsDict()
        forms.recode_unicode = self.POST.recode_unicode
        for name, item in self.POST.allitems():
            if not isinstance(item, FileUpload):
                forms[name] = item
        return forms

    @DictProperty("environ", "bottle.request.params", read_only=True)
    def params(self):
        """A :class:`FormsDict` with the combined values of :attr:`query` and
        :attr:`forms`. File uploads are stored in :attr:`files`."""
        params = FormsDict()
        for key, value in self.query.allitems():
            params[key] = value
        for key, value in self.forms.allitems():
            params[key] = value
        return params

    @DictProperty("environ", "bottle.request.files", read_only=True)
    def files(self):
        """File uploads parsed from `multipart/form-data` encoded POST or PUT
        request body. The values are instances of :class:`FileUpload`.

        """
        files = FormsDict()
        files.recode_unicode = self.POST.recode_unicode
        for name, item in self.POST.allitems():
            if isinstance(item, FileUpload):
                files[name] = item
        return files

    @DictProperty("environ", "bottle.request.json", read_only=True)
    def json(self):
        """If the ``Content-Type`` header is ``application/json`` or
        ``application/json-rpc``, this property holds the parsed content
        of the request body. Only requests smaller than :attr:`MEMFILE_MAX`
        are processed to avoid memory exhaustion.
        Invalid JSON raises a 400 error response.
        """
        ctype = self.environ.get("CONTENT_TYPE", "").lower().split(";")[0]
        if ctype in ("application/json", "application/json-rpc"):
            b = self._get_body_string(self.MEMFILE_MAX)
            if not b:
                return None
            try:
                return json_loads(b)
            except (ValueError, TypeError) as err:
                raise HTTPError(400, "Invalid JSON", exception=err)
        return None

    def _iter_body(self, read, bufsize):
        maxread = max(0, self.content_length)
        while maxread:
            part = read(min(maxread, bufsize))
            if not part:
                break
            yield part
            maxread -= len(part)

    @staticmethod
    def _iter_chunked(read, bufsize):
        err = HTTPError(400, "Error while parsing chunked transfer body.")
        rn, sem, bs = tob("\r\n"), tob(";"), tob("")
        while True:
            header = read(1)
            while header[-2:] != rn:
                c = read(1)
                header += c
                if not c:
                    raise err
                if len(header) > bufsize:
                    raise err
            size, _, _ = header.partition(sem)
            try:
                maxread = int(tonat(size.strip()), 16)
            except ValueError:
                raise err
            if maxread == 0:
                break
            buff = bs
            while maxread > 0:
                if not buff:
                    buff = read(min(maxread, bufsize))
                part, buff = buff[:maxread], buff[maxread:]
                if not part:
                    raise err
                yield part
                maxread -= len(part)
            if read(2) != rn:
                raise err

    @DictProperty("environ", "bottle.request.body", read_only=True)
    def _body(self):
        try:
            read_func = self.environ["wsgi.input"].read
        except KeyError:
            self.environ["wsgi.input"] = BytesIO()
            return self.environ["wsgi.input"]
        body_iter = self._iter_chunked if self.chunked else self._iter_body
        body, body_size, is_temp_file = BytesIO(), 0, False
        for part in body_iter(read_func, self.MEMFILE_MAX):
            body.write(part)
            body_size += len(part)
            if not is_temp_file and body_size > self.MEMFILE_MAX:
                body, tmp = NamedTemporaryFile(mode="w+b"), body
                body.write(tmp.getvalue())
                del tmp
                is_temp_file = True
        self.environ["wsgi.input"] = body
        body.seek(0)
        return body

    def _get_body_string(self, maxread):
        """Read body into a string. Raise HTTPError(413) on requests that are
        too large."""
        if self.content_length > maxread:
            raise HTTPError(413, "Request entity too large")
        data = self.body.read(maxread + 1)
        if len(data) > maxread:
            raise HTTPError(413, "Request entity too large")
        return data

    @property
    def body(self):
        """The HTTP request body as a seek-able file-like object. Depending on
        :attr:`MEMFILE_MAX`, this is either a temporary file or a
        :class:`io.BytesIO` instance. Accessing this property for the first
        time reads and replaces the ``wsgi.input`` environ variable.
        Subsequent accesses just do a `seek(0)` on the file object."""
        self._body.seek(0)
        return self._body

    @property
    def chunked(self):
        """True if Chunked transfer encoding was."""
        return "chunked" in self.environ.get("HTTP_TRANSFER_ENCODING", "").lower()

    #: An alias for :attr:`query`.
    GET = query

    @DictProperty("environ", "bottle.request.post", read_only=True)
    def POST(self):
        """The values of :attr:`forms` and :attr:`files` combined into a single
        :class:`FormsDict`. Values are either strings (form values) or
        instances of :class:`FileUpload`.
        """
        post = FormsDict()
        content_type = self.environ.get("CONTENT_TYPE", "")
        content_type, options = _parse_http_header(content_type)[0]
        # We default to application/x-www-form-urlencoded for everything that
        # is not multipart and take the fast path (also: 3.1 workaround)
        if not content_type.startswith("multipart/"):
            body = tonat(self._get_body_string(self.MEMFILE_MAX), "latin1")
            for key, value in _parse_qsl(body):
                post[key] = value
            return post

        post.recode_unicode = False
        charset = options.get("charset", "utf8")
        boundary = options.get("boundary")
        if not boundary:
            raise MultipartError("Invalid content type header, missing boundary")
        parser = _MultipartParser(
            self.body,
            boundary,
            self.content_length,
            mem_limit=self.MEMFILE_MAX,
            memfile_limit=self.MEMFILE_MAX,
            charset=charset,
        )

        for part in parser.parse():
            if not part.filename and part.is_buffered():
                post[part.name] = tonat(part.value, "utf8")
            else:
                post[part.name] = FileUpload(
                    part.file, part.name, part.filename, part.headerlist
                )

        return post

    @property
    def url(self):
        """The full request URI including hostname and scheme. If your app
        lives behind a reverse proxy or load balancer and you get confusing
        results, make sure that the ``X-Forwarded-Host`` header is set
        correctly."""
        return self.urlparts.geturl()

    @DictProperty("environ", "bottle.request.urlparts", read_only=True)
    def urlparts(self):
        """The :attr:`url` string as an :class:`urlparse.SplitResult` tuple.
        The tuple contains (scheme, host, path, query_string and fragment),
        but the fragment is always empty because it is not visible to the
        server."""
        env = self.environ
        http = env.get("HTTP_X_FORWARDED_PROTO") or env.get("wsgi.url_scheme", "http")
        host = env.get("HTTP_X_FORWARDED_HOST") or env.get("HTTP_HOST")
        if not host:
            # HTTP 1.1 requires a Host-header. This is for HTTP/1.0 clients.
            host = env.get("SERVER_NAME", "127.0.0.1")
            port = env.get("SERVER_PORT")
            if port and port != ("80" if http == "http" else "443"):
                host += ":" + port
        path = urlquote(self.fullpath)
        return UrlSplitResult(http, host, path, env.get("QUERY_STRING"), "")

    @property
    def fullpath(self):
        """Request path including :attr:`script_name` (if present)."""
        return urljoin(self.script_name, self.path.lstrip("/"))

    @property
    def query_string(self):
        """The raw :attr:`query` part of the URL (everything in between ``?``
        and ``#``) as a string."""
        return self.environ.get("QUERY_STRING", "")

    @property
    def script_name(self):
        """The initial portion of the URL's `path` that was removed by a higher
        level (server or routing middleware) before the application was
        called. This script path is returned with leading and tailing
        slashes."""
        script_name = self.environ.get("SCRIPT_NAME", "").strip("/")
        return "/" + script_name + "/" if script_name else "/"

    def path_shift(self, shift=1):
        """Shift path segments from :attr:`path` to :attr:`script_name` and
         vice versa.

        :param shift: The number of path segments to shift. May be negative
                      to change the shift direction. (default: 1)
        """
        script, path = path_shift(
            self.environ.get("SCRIPT_NAME", "/"), self.path, shift
        )
        self["SCRIPT_NAME"], self["PATH_INFO"] = script, path

    @property
    def content_length(self):
        """The request body length as an integer. The client is responsible to
        set this header. Otherwise, the real length of the body is unknown
        and -1 is returned. In this case, :attr:`body` will be empty."""
        return int(self.environ.get("CONTENT_LENGTH") or -1)

    @property
    def content_type(self):
        """The Content-Type header as a lowercase-string (default: empty)."""
        return self.environ.get("CONTENT_TYPE", "").lower()

    @property
    def is_xhr(self):
        """True if the request was triggered by a XMLHttpRequest. This only
        works with JavaScript libraries that support the `X-Requested-With`
        header (most of the popular libraries do)."""
        requested_with = self.environ.get("HTTP_X_REQUESTED_WITH", "")
        return requested_with.lower() == "xmlhttprequest"

    @property
    def is_ajax(self):
        """Alias for :attr:`is_xhr`. "Ajax" is not the right term."""
        return self.is_xhr

    @property
    def auth(self):
        """HTTP authentication data as a (user, password) tuple. This
        implementation currently supports basic (not digest) authentication
        only. If the authentication happened at a higher level (e.g. in the
        front web-server or a middleware), the password field is None, but
        the user field is looked up from the ``REMOTE_USER`` environ
        variable. On any errors, None is returned."""
        basic = parse_auth(self.environ.get("HTTP_AUTHORIZATION", ""))
        if basic:
            return basic
        ruser = self.environ.get("REMOTE_USER")
        if ruser:
            return (ruser, None)
        return None

    @property
    def remote_route(self):
        """A list of all IPs that were involved in this request, starting with
        the client IP and followed by zero or more proxies. This does only
        work if all proxies support the ```X-Forwarded-For`` header. Note
        that this information can be forged by malicious clients."""
        proxy = self.environ.get("HTTP_X_FORWARDED_FOR")
        if proxy:
            return [ip.strip() for ip in proxy.split(",")]
        remote = self.environ.get("REMOTE_ADDR")
        return [remote] if remote else []

    @property
    def remote_addr(self):
        """The client IP as a string. Note that this information can be forged
        by malicious clients."""
        route = self.remote_route
        return route[0] if route else None

    def copy(self):
        """Return a new :class:`Request` with a shallow :attr:`environ` copy."""
        return Request(self.environ.copy())

    def get(self, key, default=None):
        return self.environ.get(key, default)

    def __getitem__(self, key):
        return self.environ[key]

    def __delitem__(self, key):
        self[key] = ""
        del self.environ[key]

    def __iter__(self):
        return iter(self.environ)

    def __len__(self):
        return len(self.environ)

    def keys(self):
        return self.environ.keys()

    def __setitem__(self, key, value):
        """Change an environ value and clear all caches that depend on it."""

        if self.environ.get("bottle.request.readonly"):
            raise KeyError("The environ dictionary is read-only.")

        self.environ[key] = value
        todelete = ()

        if key == "wsgi.input":
            todelete = ("body", "forms", "files", "params", "post", "json")
        elif key == "QUERY_STRING":
            todelete = ("query", "params")
        elif key.startswith("HTTP_"):
            todelete = ("headers", "cookies")

        for key in todelete:
            self.environ.pop("bottle.request." + key, None)

    def __repr__(self):
        return "<%s: %s %s>" % (self.__class__.__name__, self.method, self.url)

    def __getattr__(self, name):
        """Search in self.environ for additional user defined attributes."""
        try:
            var = self.environ["bottle.request.ext.%s" % name]
            return var.__get__(self) if hasattr(var, "__get__") else var
        except KeyError:
            raise AttributeError("Attribute %r not defined." % name)

    def __setattr__(self, name, value):
        """Define new attributes that are local to the bound request environment."""
        if name == "environ":
            return object.__setattr__(self, name, value)
        key = "bottle.request.ext.%s" % name
        if hasattr(self, name):
            raise AttributeError("Attribute already defined: %s" % name)
        self.environ[key] = value

    def __delattr__(self, name):
        try:
            del self.environ["bottle.request.ext.%s" % name]
        except KeyError:
            raise AttributeError("Attribute not defined: %s" % name)

auth property

HTTP authentication data as a (user, password) tuple. This implementation currently supports basic (not digest) authentication only. If the authentication happened at a higher level (e.g. in the front web-server or a middleware), the password field is None, but the user field is looked up from the REMOTE_USER environ variable. On any errors, None is returned.

body property

The HTTP request body as a seek-able file-like object. Depending on :attr:MEMFILE_MAX, this is either a temporary file or a :class:io.BytesIO instance. Accessing this property for the first time reads and replaces the wsgi.input environ variable. Subsequent accesses just do a seek(0) on the file object.

chunked property

True if Chunked transfer encoding was.

content_length property

The request body length as an integer. The client is responsible to set this header. Otherwise, the real length of the body is unknown and -1 is returned. In this case, :attr:body will be empty.

content_type property

The Content-Type header as a lowercase-string (default: empty).

fullpath property

Request path including :attr:script_name (if present).

is_ajax property

Alias for :attr:is_xhr. "Ajax" is not the right term.

is_xhr property

True if the request was triggered by a XMLHttpRequest. This only works with JavaScript libraries that support the X-Requested-With header (most of the popular libraries do).

method property

The REQUEST_METHOD value as an uppercase string.

path property

The value of PATH_INFO with exactly one prefixed slash (to fix broken clients and avoid the "empty path" edge case).

query_string property

The raw :attr:query part of the URL (everything in between ? and #) as a string.

remote_addr property

The client IP as a string. Note that this information can be forged by malicious clients.

remote_route property

A list of all IPs that were involved in this request, starting with the client IP and followed by zero or more proxies. This does only work if all proxies support the `X-Forwarded-For header. Note that this information can be forged by malicious clients.

script_name property

The initial portion of the URL's path that was removed by a higher level (server or routing middleware) before the application was called. This script path is returned with leading and tailing slashes.

url property

The full request URI including hostname and scheme. If your app lives behind a reverse proxy or load balancer and you get confusing results, make sure that the X-Forwarded-Host header is set correctly.

POST()

The values of :attr:forms and :attr:files combined into a single :class:FormsDict. Values are either strings (form values) or instances of :class:FileUpload.

Source code in src/sgn/bottle.py
@DictProperty("environ", "bottle.request.post", read_only=True)
def POST(self):
    """The values of :attr:`forms` and :attr:`files` combined into a single
    :class:`FormsDict`. Values are either strings (form values) or
    instances of :class:`FileUpload`.
    """
    post = FormsDict()
    content_type = self.environ.get("CONTENT_TYPE", "")
    content_type, options = _parse_http_header(content_type)[0]
    # We default to application/x-www-form-urlencoded for everything that
    # is not multipart and take the fast path (also: 3.1 workaround)
    if not content_type.startswith("multipart/"):
        body = tonat(self._get_body_string(self.MEMFILE_MAX), "latin1")
        for key, value in _parse_qsl(body):
            post[key] = value
        return post

    post.recode_unicode = False
    charset = options.get("charset", "utf8")
    boundary = options.get("boundary")
    if not boundary:
        raise MultipartError("Invalid content type header, missing boundary")
    parser = _MultipartParser(
        self.body,
        boundary,
        self.content_length,
        mem_limit=self.MEMFILE_MAX,
        memfile_limit=self.MEMFILE_MAX,
        charset=charset,
    )

    for part in parser.parse():
        if not part.filename and part.is_buffered():
            post[part.name] = tonat(part.value, "utf8")
        else:
            post[part.name] = FileUpload(
                part.file, part.name, part.filename, part.headerlist
            )

    return post

__getattr__(name)

Search in self.environ for additional user defined attributes.

Source code in src/sgn/bottle.py
def __getattr__(self, name):
    """Search in self.environ for additional user defined attributes."""
    try:
        var = self.environ["bottle.request.ext.%s" % name]
        return var.__get__(self) if hasattr(var, "__get__") else var
    except KeyError:
        raise AttributeError("Attribute %r not defined." % name)

__init__(environ=None)

Wrap a WSGI environ dictionary.

Source code in src/sgn/bottle.py
def __init__(self, environ=None):
    """Wrap a WSGI environ dictionary."""
    #: The wrapped WSGI environ dictionary. This is the only real attribute.
    #: All other attributes actually are read-only properties.
    self.environ = {} if environ is None else environ
    self.environ["bottle.request"] = self

__setattr__(name, value)

Define new attributes that are local to the bound request environment.

Source code in src/sgn/bottle.py
def __setattr__(self, name, value):
    """Define new attributes that are local to the bound request environment."""
    if name == "environ":
        return object.__setattr__(self, name, value)
    key = "bottle.request.ext.%s" % name
    if hasattr(self, name):
        raise AttributeError("Attribute already defined: %s" % name)
    self.environ[key] = value

__setitem__(key, value)

Change an environ value and clear all caches that depend on it.

Source code in src/sgn/bottle.py
def __setitem__(self, key, value):
    """Change an environ value and clear all caches that depend on it."""

    if self.environ.get("bottle.request.readonly"):
        raise KeyError("The environ dictionary is read-only.")

    self.environ[key] = value
    todelete = ()

    if key == "wsgi.input":
        todelete = ("body", "forms", "files", "params", "post", "json")
    elif key == "QUERY_STRING":
        todelete = ("query", "params")
    elif key.startswith("HTTP_"):
        todelete = ("headers", "cookies")

    for key in todelete:
        self.environ.pop("bottle.request." + key, None)

app()

Bottle application handling this request.

Source code in src/sgn/bottle.py
@DictProperty("environ", "bottle.app", read_only=True)
def app(self):
    """Bottle application handling this request."""
    raise RuntimeError("This request is not connected to an application.")

cookies()

Cookies parsed into a :class:FormsDict. Signed cookies are NOT decoded. Use :meth:get_cookie if you expect signed cookies.

Source code in src/sgn/bottle.py
@DictProperty("environ", "bottle.request.cookies", read_only=True)
def cookies(self):
    """Cookies parsed into a :class:`FormsDict`. Signed cookies are NOT
    decoded. Use :meth:`get_cookie` if you expect signed cookies."""
    cookies = SimpleCookie(self.environ.get("HTTP_COOKIE", "")).values()
    return FormsDict((c.key, c.value) for c in cookies)

copy()

Return a new :class:Request with a shallow :attr:environ copy.

Source code in src/sgn/bottle.py
def copy(self):
    """Return a new :class:`Request` with a shallow :attr:`environ` copy."""
    return Request(self.environ.copy())

files()

File uploads parsed from multipart/form-data encoded POST or PUT request body. The values are instances of :class:FileUpload.

Source code in src/sgn/bottle.py
@DictProperty("environ", "bottle.request.files", read_only=True)
def files(self):
    """File uploads parsed from `multipart/form-data` encoded POST or PUT
    request body. The values are instances of :class:`FileUpload`.

    """
    files = FormsDict()
    files.recode_unicode = self.POST.recode_unicode
    for name, item in self.POST.allitems():
        if isinstance(item, FileUpload):
            files[name] = item
    return files

forms()

Form values parsed from an url-encoded or multipart/form-data encoded POST or PUT request body. The result is returned as a :class:FormsDict. All keys and values are strings. File uploads are stored separately in :attr:files.

Source code in src/sgn/bottle.py
@DictProperty("environ", "bottle.request.forms", read_only=True)
def forms(self):
    """Form values parsed from an `url-encoded` or `multipart/form-data`
    encoded POST or PUT request body. The result is returned as a
    :class:`FormsDict`. All keys and values are strings. File uploads
    are stored separately in :attr:`files`."""
    forms = FormsDict()
    forms.recode_unicode = self.POST.recode_unicode
    for name, item in self.POST.allitems():
        if not isinstance(item, FileUpload):
            forms[name] = item
    return forms

Return the content of a cookie. To read a Signed Cookie, the secret must match the one used to create the cookie (see :meth:Response.set_cookie <BaseResponse.set_cookie>). If anything goes wrong (missing cookie or wrong signature), return a default value.

Source code in src/sgn/bottle.py
def get_cookie(self, key, default=None, secret=None, digestmod=hashlib.sha256):
    """Return the content of a cookie. To read a `Signed Cookie`, the
    `secret` must match the one used to create the cookie (see
    :meth:`Response.set_cookie <BaseResponse.set_cookie>`). If anything goes wrong (missing
    cookie or wrong signature), return a default value."""
    value = self.cookies.get(key)
    if secret:
        # See BaseResponse.set_cookie for details on signed cookies.
        if value and value.startswith("!") and "?" in value:
            sig, msg = map(tob, value[1:].split("?", 1))
            hash = hmac.new(tob(secret), msg, digestmod=digestmod).digest()
            if _lscmp(sig, base64.b64encode(hash)):
                dst = pickle.loads(base64.b64decode(msg))
                if dst and dst[0] == key:
                    return dst[1]
        return default
    return value or default

get_header(name, default=None)

Return the value of a request header, or a given default value.

Source code in src/sgn/bottle.py
def get_header(self, name, default=None):
    """Return the value of a request header, or a given default value."""
    return self.headers.get(name, default)

headers()

A :class:WSGIHeaderDict that provides case-insensitive access to HTTP request headers.

Source code in src/sgn/bottle.py
@DictProperty("environ", "bottle.request.headers", read_only=True)
def headers(self):
    """A :class:`WSGIHeaderDict` that provides case-insensitive access to
    HTTP request headers."""
    return WSGIHeaderDict(self.environ)

json()

If the Content-Type header is application/json or application/json-rpc, this property holds the parsed content of the request body. Only requests smaller than :attr:MEMFILE_MAX are processed to avoid memory exhaustion. Invalid JSON raises a 400 error response.

Source code in src/sgn/bottle.py
@DictProperty("environ", "bottle.request.json", read_only=True)
def json(self):
    """If the ``Content-Type`` header is ``application/json`` or
    ``application/json-rpc``, this property holds the parsed content
    of the request body. Only requests smaller than :attr:`MEMFILE_MAX`
    are processed to avoid memory exhaustion.
    Invalid JSON raises a 400 error response.
    """
    ctype = self.environ.get("CONTENT_TYPE", "").lower().split(";")[0]
    if ctype in ("application/json", "application/json-rpc"):
        b = self._get_body_string(self.MEMFILE_MAX)
        if not b:
            return None
        try:
            return json_loads(b)
        except (ValueError, TypeError) as err:
            raise HTTPError(400, "Invalid JSON", exception=err)
    return None

params()

A :class:FormsDict with the combined values of :attr:query and :attr:forms. File uploads are stored in :attr:files.

Source code in src/sgn/bottle.py
@DictProperty("environ", "bottle.request.params", read_only=True)
def params(self):
    """A :class:`FormsDict` with the combined values of :attr:`query` and
    :attr:`forms`. File uploads are stored in :attr:`files`."""
    params = FormsDict()
    for key, value in self.query.allitems():
        params[key] = value
    for key, value in self.forms.allitems():
        params[key] = value
    return params

path_shift(shift=1)

Shift path segments from :attr:path to :attr:script_name and vice versa.

:param shift: The number of path segments to shift. May be negative to change the shift direction. (default: 1)

Source code in src/sgn/bottle.py
def path_shift(self, shift=1):
    """Shift path segments from :attr:`path` to :attr:`script_name` and
     vice versa.

    :param shift: The number of path segments to shift. May be negative
                  to change the shift direction. (default: 1)
    """
    script, path = path_shift(
        self.environ.get("SCRIPT_NAME", "/"), self.path, shift
    )
    self["SCRIPT_NAME"], self["PATH_INFO"] = script, path

query()

The :attr:query_string parsed into a :class:FormsDict. These values are sometimes called "URL arguments" or "GET parameters", but not to be confused with "URL wildcards" as they are provided by the :class:Router.

Source code in src/sgn/bottle.py
@DictProperty("environ", "bottle.request.query", read_only=True)
def query(self):
    """The :attr:`query_string` parsed into a :class:`FormsDict`. These
    values are sometimes called "URL arguments" or "GET parameters", but
    not to be confused with "URL wildcards" as they are provided by the
    :class:`Router`."""
    get = self.environ["bottle.get"] = FormsDict()
    pairs = _parse_qsl(self.environ.get("QUERY_STRING", ""))
    for key, value in pairs:
        get[key] = value
    return get

route()

The bottle :class:Route object that matches this request.

Source code in src/sgn/bottle.py
@DictProperty("environ", "bottle.route", read_only=True)
def route(self):
    """The bottle :class:`Route` object that matches this request."""
    raise RuntimeError("This request is not connected to a route.")

url_args()

The arguments extracted from the URL.

Source code in src/sgn/bottle.py
@DictProperty("environ", "route.url_args", read_only=True)
def url_args(self):
    """The arguments extracted from the URL."""
    raise RuntimeError("This request is not connected to a route.")

urlparts()

The :attr:url string as an :class:urlparse.SplitResult tuple. The tuple contains (scheme, host, path, query_string and fragment), but the fragment is always empty because it is not visible to the server.

Source code in src/sgn/bottle.py
@DictProperty("environ", "bottle.request.urlparts", read_only=True)
def urlparts(self):
    """The :attr:`url` string as an :class:`urlparse.SplitResult` tuple.
    The tuple contains (scheme, host, path, query_string and fragment),
    but the fragment is always empty because it is not visible to the
    server."""
    env = self.environ
    http = env.get("HTTP_X_FORWARDED_PROTO") or env.get("wsgi.url_scheme", "http")
    host = env.get("HTTP_X_FORWARDED_HOST") or env.get("HTTP_HOST")
    if not host:
        # HTTP 1.1 requires a Host-header. This is for HTTP/1.0 clients.
        host = env.get("SERVER_NAME", "127.0.0.1")
        port = env.get("SERVER_PORT")
        if port and port != ("80" if http == "http" else "443"):
            host += ":" + port
    path = urlquote(self.fullpath)
    return UrlSplitResult(http, host, path, env.get("QUERY_STRING"), "")

BaseResponse

Bases: object


              flowchart TD
              sgn.bottle.BaseResponse[BaseResponse]

              

              click sgn.bottle.BaseResponse href "" "sgn.bottle.BaseResponse"
            

Storage class for a response body as well as headers and cookies.

This class does support dict-like case-insensitive item-access to headers, but is NOT a dict. Most notably, iterating over a response yields parts of the body and not the headers.

Source code in src/sgn/bottle.py
class BaseResponse(object):
    """Storage class for a response body as well as headers and cookies.

    This class does support dict-like case-insensitive item-access to
    headers, but is NOT a dict. Most notably, iterating over a response
    yields parts of the body and not the headers.
    """

    default_status = 200
    default_content_type = "text/html; charset=UTF-8"

    # Header denylist for specific response codes
    # (rfc2616 section 10.2.3 and 10.3.5)
    bad_headers = {
        204: frozenset(("Content-Type", "Content-Length")),
        304: frozenset(
            (
                "Allow",
                "Content-Encoding",
                "Content-Language",
                "Content-Length",
                "Content-Range",
                "Content-Type",
                "Content-Md5",
                "Last-Modified",
            )
        ),
    }

    def __init__(self, body="", status=None, headers=None, **more_headers):
        """Create a new response object.

        :param body: The response body as one of the supported types.
        :param status: Either an HTTP status code (e.g. 200) or a status line
                       including the reason phrase (e.g. '200 OK').
        :param headers: A dictionary or a list of name-value pairs.

        Additional keyword arguments are added to the list of headers.
        Underscores in the header name are replaced with dashes.
        """
        self._cookies = None
        self._headers = {}
        self.body = body
        self.status = status or self.default_status
        if headers:
            if isinstance(headers, dict):
                headers = headers.items()
            for name, value in headers:
                self.add_header(name, value)
        if more_headers:
            for name, value in more_headers.items():
                self.add_header(name, value)

    def copy(self, cls=None):
        """Returns a copy of self."""
        cls = cls or BaseResponse
        assert issubclass(cls, BaseResponse)
        copy = cls()
        copy.status = self.status
        copy._headers = dict((k, v[:]) for (k, v) in self._headers.items())
        if self._cookies:
            cookies = copy._cookies = SimpleCookie()
            for k, v in self._cookies.items():
                cookies[k] = v.value
                cookies[k].update(v)  # also copy cookie attributes
        return copy

    def __iter__(self):
        return iter(self.body)

    def close(self):
        if hasattr(self.body, "close"):
            self.body.close()

    @property
    def status_line(self):
        """The HTTP status line as a string (e.g. ``404 Not Found``)."""
        return self._status_line

    @property
    def status_code(self):
        """The HTTP status code as an integer (e.g. 404)."""
        return self._status_code

    def _set_status(self, status):
        if isinstance(status, int):
            code, status = status, _HTTP_STATUS_LINES.get(status)
        elif " " in status:
            if "\n" in status or "\r" in status or "\0" in status:
                raise ValueError("Status line must not include control chars.")
            status = status.strip()
            code = int(status.split()[0])
        else:
            raise ValueError("String status line without a reason phrase.")
        if not 100 <= code <= 999:
            raise ValueError("Status code out of range.")
        self._status_code = code
        self._status_line = str(status or ("%d Unknown" % code))

    def _get_status(self):
        return self._status_line

    status = property(
        _get_status,
        _set_status,
        None,
        """ A writeable property to change the HTTP response status. It accepts
            either a numeric code (100-999) or a string with a custom reason
            phrase (e.g. "404 Brain not found"). Both :data:`status_line` and
            :data:`status_code` are updated accordingly. The return value is
            always a status string. """,
    )
    del _get_status, _set_status

    @property
    def headers(self):
        """An instance of :class:`HeaderDict`, a case-insensitive dict-like
        view on the response headers."""
        hdict = HeaderDict()
        hdict.dict = self._headers
        return hdict

    def __contains__(self, name):
        return _hkey(name) in self._headers

    def __delitem__(self, name):
        del self._headers[_hkey(name)]

    def __getitem__(self, name):
        return self._headers[_hkey(name)][-1]

    def __setitem__(self, name, value):
        self._headers[_hkey(name)] = [_hval(value)]

    def get_header(self, name, default=None):
        """Return the value of a previously defined header. If there is no
        header with that name, return a default value."""
        return self._headers.get(_hkey(name), [default])[-1]

    def set_header(self, name, value):
        """Create a new response header, replacing any previously defined
        headers with the same name."""
        self._headers[_hkey(name)] = [_hval(value)]

    def add_header(self, name, value):
        """Add an additional response header, not removing duplicates."""
        self._headers.setdefault(_hkey(name), []).append(_hval(value))

    def iter_headers(self):
        """Yield (header, value) tuples, skipping headers that are not
        allowed with the current response status code."""
        return self.headerlist

    def _wsgi_status_line(self):
        """WSGI conform status line (latin1-encodeable)"""
        if py3k:
            return self._status_line.encode("utf8").decode("latin1")
        return self._status_line

    @property
    def headerlist(self):
        """WSGI conform list of (header, value) tuples."""
        out = []
        headers = list(self._headers.items())
        if "Content-Type" not in self._headers:
            headers.append(("Content-Type", [self.default_content_type]))
        if self._status_code in self.bad_headers:
            bad_headers = self.bad_headers[self._status_code]
            headers = [h for h in headers if h[0] not in bad_headers]
        out += [(name, val) for (name, vals) in headers for val in vals]
        if self._cookies:
            for c in self._cookies.values():
                out.append(("Set-Cookie", _hval(c.OutputString())))
        if py3k:
            out = [(k, v.encode("utf8").decode("latin1")) for (k, v) in out]
        return out

    content_type = HeaderProperty("Content-Type")
    content_length = HeaderProperty("Content-Length", reader=int, default=-1)
    expires = HeaderProperty(
        "Expires",
        reader=lambda x: datetime.fromtimestamp(parse_date(x), UTC),
        writer=lambda x: http_date(x),
    )

    @property
    def charset(self, default="UTF-8"):
        """Return the charset specified in the content-type header (default: utf8)."""
        if "charset=" in self.content_type:
            return self.content_type.split("charset=")[-1].split(";")[0].strip()
        return default

    def set_cookie(self, name, value, secret=None, digestmod=hashlib.sha256, **options):
        """Create a new cookie or replace an old one. If the `secret` parameter is
        set, create a `Signed Cookie` (described below).

        :param name: the name of the cookie.
        :param value: the value of the cookie.
        :param secret: a signature key required for signed cookies.

        Additionally, this method accepts all RFC 2109 attributes that are
        supported by :class:`cookie.Morsel`, including:

        :param maxage: maximum age in seconds. (default: None)
        :param expires: a datetime object or UNIX timestamp. (default: None)
        :param domain: the domain that is allowed to read the cookie.
          (default: current domain)
        :param path: limits the cookie to a given path (default: current path)
        :param secure: limit the cookie to HTTPS connections (default: off).
        :param httponly: prevents client-side javascript to read this cookie
          (default: off, requires Python 2.6 or newer).
        :param samesite: Control or disable third-party use for this cookie.
          Possible values: `lax`, `strict` or `none` (default).

        If neither `expires` nor `maxage` is set (default), the cookie will
        expire at the end of the browser session (as soon as the browser
        window is closed).

        Signed cookies may store any pickle-able object and are
        cryptographically signed to prevent manipulation. Keep in mind that
        cookies are limited to 4kb in most browsers.

        Warning: Pickle is a potentially dangerous format. If an attacker
        gains access to the secret key, he could forge cookies that execute
        code on server side if unpickled. Using pickle is discouraged and
        support for it will be removed in later versions of bottle.

        Warning: Signed cookies are not encrypted (the client can still see
        the content) and not copy-protected (the client can restore an old
        cookie). The main intention is to make pickling and unpickling
        save, not to store secret information at client side.
        """
        if not self._cookies:
            self._cookies = SimpleCookie()

        # Monkey-patch Cookie lib to support 'SameSite' parameter
        # https://tools.ietf.org/html/draft-west-first-party-cookies-07#section-4.1
        if py < (3, 8, 0):
            Morsel._reserved.setdefault("samesite", "SameSite")

        if secret:
            if not isinstance(value, basestring):
                depr(
                    0,
                    13,
                    "Pickling of arbitrary objects into cookies is " "deprecated.",
                    "Only store strings in cookies. " "JSON strings are fine, too.",
                )
            encoded = base64.b64encode(pickle.dumps([name, value], -1))
            sig = base64.b64encode(
                hmac.new(tob(secret), encoded, digestmod=digestmod).digest()
            )
            value = touni(tob("!") + sig + tob("?") + encoded)
        elif not isinstance(value, basestring):
            raise TypeError("Secret key required for non-string cookies.")

        # Cookie size plus options must not exceed 4kb.
        if len(name) + len(value) > 3800:
            raise ValueError("Content does not fit into a cookie.")

        self._cookies[name] = value

        for key, value in options.items():
            if key in ("max_age", "maxage"):  # 'maxage' variant added in 0.13
                key = "max-age"
                if isinstance(value, timedelta):
                    value = value.seconds + value.days * 24 * 3600
            if key == "expires":
                value = http_date(value)
            if key in ("same_site", "samesite"):  # 'samesite' variant added in 0.13
                key, value = "samesite", (value or "none").lower()
                if value not in ("lax", "strict", "none"):
                    raise CookieError("Invalid value for SameSite")
            if key in ("secure", "httponly") and not value:
                continue
            self._cookies[name][key] = value

    def delete_cookie(self, key, **kwargs):
        """Delete a cookie. Be sure to use the same `domain` and `path`
        settings as used to create the cookie."""
        kwargs["max_age"] = -1
        kwargs["expires"] = 0
        self.set_cookie(key, "", **kwargs)

    def __repr__(self):
        out = ""
        for name, value in self.headerlist:
            out += "%s: %s\n" % (name.title(), value.strip())
        return out

charset property

Return the charset specified in the content-type header (default: utf8).

headerlist property

WSGI conform list of (header, value) tuples.

headers property

An instance of :class:HeaderDict, a case-insensitive dict-like view on the response headers.

status_code property

The HTTP status code as an integer (e.g. 404).

status_line property

The HTTP status line as a string (e.g. 404 Not Found).

__init__(body='', status=None, headers=None, **more_headers)

Create a new response object.

:param body: The response body as one of the supported types. :param status: Either an HTTP status code (e.g. 200) or a status line including the reason phrase (e.g. '200 OK'). :param headers: A dictionary or a list of name-value pairs.

Additional keyword arguments are added to the list of headers. Underscores in the header name are replaced with dashes.

Source code in src/sgn/bottle.py
def __init__(self, body="", status=None, headers=None, **more_headers):
    """Create a new response object.

    :param body: The response body as one of the supported types.
    :param status: Either an HTTP status code (e.g. 200) or a status line
                   including the reason phrase (e.g. '200 OK').
    :param headers: A dictionary or a list of name-value pairs.

    Additional keyword arguments are added to the list of headers.
    Underscores in the header name are replaced with dashes.
    """
    self._cookies = None
    self._headers = {}
    self.body = body
    self.status = status or self.default_status
    if headers:
        if isinstance(headers, dict):
            headers = headers.items()
        for name, value in headers:
            self.add_header(name, value)
    if more_headers:
        for name, value in more_headers.items():
            self.add_header(name, value)

add_header(name, value)

Add an additional response header, not removing duplicates.

Source code in src/sgn/bottle.py
def add_header(self, name, value):
    """Add an additional response header, not removing duplicates."""
    self._headers.setdefault(_hkey(name), []).append(_hval(value))

copy(cls=None)

Returns a copy of self.

Source code in src/sgn/bottle.py
def copy(self, cls=None):
    """Returns a copy of self."""
    cls = cls or BaseResponse
    assert issubclass(cls, BaseResponse)
    copy = cls()
    copy.status = self.status
    copy._headers = dict((k, v[:]) for (k, v) in self._headers.items())
    if self._cookies:
        cookies = copy._cookies = SimpleCookie()
        for k, v in self._cookies.items():
            cookies[k] = v.value
            cookies[k].update(v)  # also copy cookie attributes
    return copy

Delete a cookie. Be sure to use the same domain and path settings as used to create the cookie.

Source code in src/sgn/bottle.py
def delete_cookie(self, key, **kwargs):
    """Delete a cookie. Be sure to use the same `domain` and `path`
    settings as used to create the cookie."""
    kwargs["max_age"] = -1
    kwargs["expires"] = 0
    self.set_cookie(key, "", **kwargs)

get_header(name, default=None)

Return the value of a previously defined header. If there is no header with that name, return a default value.

Source code in src/sgn/bottle.py
def get_header(self, name, default=None):
    """Return the value of a previously defined header. If there is no
    header with that name, return a default value."""
    return self._headers.get(_hkey(name), [default])[-1]

iter_headers()

Yield (header, value) tuples, skipping headers that are not allowed with the current response status code.

Source code in src/sgn/bottle.py
def iter_headers(self):
    """Yield (header, value) tuples, skipping headers that are not
    allowed with the current response status code."""
    return self.headerlist

Create a new cookie or replace an old one. If the secret parameter is set, create a Signed Cookie (described below).

:param name: the name of the cookie. :param value: the value of the cookie. :param secret: a signature key required for signed cookies.

Additionally, this method accepts all RFC 2109 attributes that are supported by :class:cookie.Morsel, including:

:param maxage: maximum age in seconds. (default: None) :param expires: a datetime object or UNIX timestamp. (default: None) :param domain: the domain that is allowed to read the cookie. (default: current domain) :param path: limits the cookie to a given path (default: current path) :param secure: limit the cookie to HTTPS connections (default: off). :param httponly: prevents client-side javascript to read this cookie (default: off, requires Python 2.6 or newer). :param samesite: Control or disable third-party use for this cookie. Possible values: lax, strict or none (default).

If neither expires nor maxage is set (default), the cookie will expire at the end of the browser session (as soon as the browser window is closed).

Signed cookies may store any pickle-able object and are cryptographically signed to prevent manipulation. Keep in mind that cookies are limited to 4kb in most browsers.

Warning: Pickle is a potentially dangerous format. If an attacker gains access to the secret key, he could forge cookies that execute code on server side if unpickled. Using pickle is discouraged and support for it will be removed in later versions of bottle.

Warning: Signed cookies are not encrypted (the client can still see the content) and not copy-protected (the client can restore an old cookie). The main intention is to make pickling and unpickling save, not to store secret information at client side.

Source code in src/sgn/bottle.py
def set_cookie(self, name, value, secret=None, digestmod=hashlib.sha256, **options):
    """Create a new cookie or replace an old one. If the `secret` parameter is
    set, create a `Signed Cookie` (described below).

    :param name: the name of the cookie.
    :param value: the value of the cookie.
    :param secret: a signature key required for signed cookies.

    Additionally, this method accepts all RFC 2109 attributes that are
    supported by :class:`cookie.Morsel`, including:

    :param maxage: maximum age in seconds. (default: None)
    :param expires: a datetime object or UNIX timestamp. (default: None)
    :param domain: the domain that is allowed to read the cookie.
      (default: current domain)
    :param path: limits the cookie to a given path (default: current path)
    :param secure: limit the cookie to HTTPS connections (default: off).
    :param httponly: prevents client-side javascript to read this cookie
      (default: off, requires Python 2.6 or newer).
    :param samesite: Control or disable third-party use for this cookie.
      Possible values: `lax`, `strict` or `none` (default).

    If neither `expires` nor `maxage` is set (default), the cookie will
    expire at the end of the browser session (as soon as the browser
    window is closed).

    Signed cookies may store any pickle-able object and are
    cryptographically signed to prevent manipulation. Keep in mind that
    cookies are limited to 4kb in most browsers.

    Warning: Pickle is a potentially dangerous format. If an attacker
    gains access to the secret key, he could forge cookies that execute
    code on server side if unpickled. Using pickle is discouraged and
    support for it will be removed in later versions of bottle.

    Warning: Signed cookies are not encrypted (the client can still see
    the content) and not copy-protected (the client can restore an old
    cookie). The main intention is to make pickling and unpickling
    save, not to store secret information at client side.
    """
    if not self._cookies:
        self._cookies = SimpleCookie()

    # Monkey-patch Cookie lib to support 'SameSite' parameter
    # https://tools.ietf.org/html/draft-west-first-party-cookies-07#section-4.1
    if py < (3, 8, 0):
        Morsel._reserved.setdefault("samesite", "SameSite")

    if secret:
        if not isinstance(value, basestring):
            depr(
                0,
                13,
                "Pickling of arbitrary objects into cookies is " "deprecated.",
                "Only store strings in cookies. " "JSON strings are fine, too.",
            )
        encoded = base64.b64encode(pickle.dumps([name, value], -1))
        sig = base64.b64encode(
            hmac.new(tob(secret), encoded, digestmod=digestmod).digest()
        )
        value = touni(tob("!") + sig + tob("?") + encoded)
    elif not isinstance(value, basestring):
        raise TypeError("Secret key required for non-string cookies.")

    # Cookie size plus options must not exceed 4kb.
    if len(name) + len(value) > 3800:
        raise ValueError("Content does not fit into a cookie.")

    self._cookies[name] = value

    for key, value in options.items():
        if key in ("max_age", "maxage"):  # 'maxage' variant added in 0.13
            key = "max-age"
            if isinstance(value, timedelta):
                value = value.seconds + value.days * 24 * 3600
        if key == "expires":
            value = http_date(value)
        if key in ("same_site", "samesite"):  # 'samesite' variant added in 0.13
            key, value = "samesite", (value or "none").lower()
            if value not in ("lax", "strict", "none"):
                raise CookieError("Invalid value for SameSite")
        if key in ("secure", "httponly") and not value:
            continue
        self._cookies[name][key] = value

set_header(name, value)

Create a new response header, replacing any previously defined headers with the same name.

Source code in src/sgn/bottle.py
def set_header(self, name, value):
    """Create a new response header, replacing any previously defined
    headers with the same name."""
    self._headers[_hkey(name)] = [_hval(value)]

BaseTemplate

Bases: object


              flowchart TD
              sgn.bottle.BaseTemplate[BaseTemplate]

              

              click sgn.bottle.BaseTemplate href "" "sgn.bottle.BaseTemplate"
            

Base class and minimal API for template adapters

Source code in src/sgn/bottle.py
class BaseTemplate(object):
    """Base class and minimal API for template adapters"""

    extensions = ["tpl", "html", "thtml", "stpl"]
    settings = {}  # used in prepare()
    defaults = {}  # used in render()

    def __init__(
        self, source=None, name=None, lookup=None, encoding="utf8", **settings
    ):
        """Create a new template.
        If the source parameter (str or buffer) is missing, the name argument
        is used to guess a template filename. Subclasses can assume that
        self.source and/or self.filename are set. Both are strings.
        The lookup, encoding and settings parameters are stored as instance
        variables.
        The lookup parameter stores a list containing directory paths.
        The encoding parameter should be used to decode byte strings or files.
        The settings parameter contains a dict for engine-specific settings.
        """
        self.name = name
        self.source = source.read() if hasattr(source, "read") else source
        self.filename = source.filename if hasattr(source, "filename") else None
        self.lookup = [os.path.abspath(x) for x in lookup] if lookup else []
        self.encoding = encoding
        self.settings = self.settings.copy()  # Copy from class variable
        self.settings.update(settings)  # Apply
        if not self.source and self.name:
            self.filename = self.search(self.name, self.lookup)
            if not self.filename:
                raise TemplateError("Template %s not found." % repr(name))
        if not self.source and not self.filename:
            raise TemplateError("No template specified.")
        self.prepare(**self.settings)

    @classmethod
    def search(cls, name, lookup=None):
        """Search name in all directories specified in lookup.
        First without, then with common extensions. Return first hit."""
        if not lookup:
            raise depr(
                0,
                12,
                "Empty template lookup path.",
                "Configure a template lookup path.",
            )

        if os.path.isabs(name):
            raise depr(
                0,
                12,
                "Use of absolute path for template name.",
                "Refer to templates with names or paths relative to the lookup path.",
            )

        for spath in lookup:
            spath = os.path.abspath(spath) + os.sep
            fname = os.path.abspath(os.path.join(spath, name))
            if not fname.startswith(spath):
                continue
            if os.path.isfile(fname):
                return fname
            for ext in cls.extensions:
                if os.path.isfile("%s.%s" % (fname, ext)):
                    return "%s.%s" % (fname, ext)

    @classmethod
    def global_config(cls, key, *args):
        """This reads or sets the global settings stored in class.settings."""
        if args:
            cls.settings = cls.settings.copy()  # Make settings local to class
            cls.settings[key] = args[0]
        else:
            return cls.settings[key]

    def prepare(self, **options):
        """Run preparations (parsing, caching, ...).
        It should be possible to call this again to refresh a template or to
        update settings.
        """
        raise NotImplementedError

    def render(self, *args, **kwargs):
        """Render the template with the specified local variables and return
        a single byte or unicode string. If it is a byte string, the encoding
        must match self.encoding. This method must be thread-safe!
        Local variables may be provided in dictionaries (args)
        or directly, as keywords (kwargs).
        """
        raise NotImplementedError

__init__(source=None, name=None, lookup=None, encoding='utf8', **settings)

Create a new template. If the source parameter (str or buffer) is missing, the name argument is used to guess a template filename. Subclasses can assume that self.source and/or self.filename are set. Both are strings. The lookup, encoding and settings parameters are stored as instance variables. The lookup parameter stores a list containing directory paths. The encoding parameter should be used to decode byte strings or files. The settings parameter contains a dict for engine-specific settings.

Source code in src/sgn/bottle.py
def __init__(
    self, source=None, name=None, lookup=None, encoding="utf8", **settings
):
    """Create a new template.
    If the source parameter (str or buffer) is missing, the name argument
    is used to guess a template filename. Subclasses can assume that
    self.source and/or self.filename are set. Both are strings.
    The lookup, encoding and settings parameters are stored as instance
    variables.
    The lookup parameter stores a list containing directory paths.
    The encoding parameter should be used to decode byte strings or files.
    The settings parameter contains a dict for engine-specific settings.
    """
    self.name = name
    self.source = source.read() if hasattr(source, "read") else source
    self.filename = source.filename if hasattr(source, "filename") else None
    self.lookup = [os.path.abspath(x) for x in lookup] if lookup else []
    self.encoding = encoding
    self.settings = self.settings.copy()  # Copy from class variable
    self.settings.update(settings)  # Apply
    if not self.source and self.name:
        self.filename = self.search(self.name, self.lookup)
        if not self.filename:
            raise TemplateError("Template %s not found." % repr(name))
    if not self.source and not self.filename:
        raise TemplateError("No template specified.")
    self.prepare(**self.settings)

global_config(key, *args) classmethod

This reads or sets the global settings stored in class.settings.

Source code in src/sgn/bottle.py
@classmethod
def global_config(cls, key, *args):
    """This reads or sets the global settings stored in class.settings."""
    if args:
        cls.settings = cls.settings.copy()  # Make settings local to class
        cls.settings[key] = args[0]
    else:
        return cls.settings[key]

prepare(**options)

Run preparations (parsing, caching, ...). It should be possible to call this again to refresh a template or to update settings.

Source code in src/sgn/bottle.py
def prepare(self, **options):
    """Run preparations (parsing, caching, ...).
    It should be possible to call this again to refresh a template or to
    update settings.
    """
    raise NotImplementedError

render(*args, **kwargs)

Render the template with the specified local variables and return a single byte or unicode string. If it is a byte string, the encoding must match self.encoding. This method must be thread-safe! Local variables may be provided in dictionaries (args) or directly, as keywords (kwargs).

Source code in src/sgn/bottle.py
def render(self, *args, **kwargs):
    """Render the template with the specified local variables and return
    a single byte or unicode string. If it is a byte string, the encoding
    must match self.encoding. This method must be thread-safe!
    Local variables may be provided in dictionaries (args)
    or directly, as keywords (kwargs).
    """
    raise NotImplementedError

search(name, lookup=None) classmethod

Search name in all directories specified in lookup. First without, then with common extensions. Return first hit.

Source code in src/sgn/bottle.py
@classmethod
def search(cls, name, lookup=None):
    """Search name in all directories specified in lookup.
    First without, then with common extensions. Return first hit."""
    if not lookup:
        raise depr(
            0,
            12,
            "Empty template lookup path.",
            "Configure a template lookup path.",
        )

    if os.path.isabs(name):
        raise depr(
            0,
            12,
            "Use of absolute path for template name.",
            "Refer to templates with names or paths relative to the lookup path.",
        )

    for spath in lookup:
        spath = os.path.abspath(spath) + os.sep
        fname = os.path.abspath(os.path.join(spath, name))
        if not fname.startswith(spath):
            continue
        if os.path.isfile(fname):
            return fname
        for ext in cls.extensions:
            if os.path.isfile("%s.%s" % (fname, ext)):
                return "%s.%s" % (fname, ext)

BjoernServer

Bases: ServerAdapter


              flowchart TD
              sgn.bottle.BjoernServer[BjoernServer]
              sgn.bottle.ServerAdapter[ServerAdapter]

                              sgn.bottle.ServerAdapter --> sgn.bottle.BjoernServer
                


              click sgn.bottle.BjoernServer href "" "sgn.bottle.BjoernServer"
              click sgn.bottle.ServerAdapter href "" "sgn.bottle.ServerAdapter"
            

Fast server written in C: https://github.com/jonashaag/bjoern

Source code in src/sgn/bottle.py
class BjoernServer(ServerAdapter):
    """Fast server written in C: https://github.com/jonashaag/bjoern"""

    def run(self, handler):
        from bjoern import run

        run(handler, self.host, self.port, reuse_port=True)

Bottle

Bases: object


              flowchart TD
              sgn.bottle.Bottle[Bottle]

              

              click sgn.bottle.Bottle href "" "sgn.bottle.Bottle"
            

Each Bottle object represents a single, distinct web application and consists of routes, callbacks, plugins, resources and configuration. Instances are callable WSGI applications.

:param catchall: If true (default), handle all exceptions. Turn off to let debugging middleware handle exceptions.

Source code in src/sgn/bottle.py
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
class Bottle(object):
    """Each Bottle object represents a single, distinct web application and
    consists of routes, callbacks, plugins, resources and configuration.
    Instances are callable WSGI applications.

    :param catchall: If true (default), handle all exceptions. Turn off to
                     let debugging middleware handle exceptions.
    """

    @lazy_attribute
    def _global_config(cls):
        cfg = ConfigDict()
        cfg.meta_set("catchall", "validate", bool)
        return cfg

    def __init__(self, **kwargs):
        #: A :class:`ConfigDict` for app specific configuration.
        self.config = self._global_config._make_overlay()
        self.config._add_change_listener(functools.partial(self.trigger_hook, "config"))

        self.config.update({"catchall": True})

        if kwargs.get("catchall") is False:
            depr(
                0,
                13,
                "Bottle(catchall) keyword argument.",
                "The 'catchall' setting is now part of the app "
                "configuration. Fix: `app.config['catchall'] = False`",
            )
            self.config["catchall"] = False
        if kwargs.get("autojson") is False:
            depr(
                0,
                13,
                "Bottle(autojson) keyword argument.",
                "The 'autojson' setting is now part of the app "
                "configuration. Fix: `app.config['json.enable'] = False`",
            )
            self.config["json.enable"] = False

        self._mounts = []

        #: A :class:`ResourceManager` for application files
        self.resources = ResourceManager()

        self.routes = []  # List of installed :class:`Route` instances.
        self.router = Router()  # Maps requests to :class:`Route` instances.
        self.error_handler = {}

        # Core plugins
        self.plugins = []  # List of installed plugins.
        self.install(JSONPlugin())
        self.install(TemplatePlugin())

    #: If true, most exceptions are caught and returned as :exc:`HTTPError`
    catchall = DictProperty("config", "catchall")

    __hook_names = "before_request", "after_request", "app_reset", "config"
    __hook_reversed = {"after_request"}

    @cached_property
    def _hooks(self):
        return dict((name, []) for name in self.__hook_names)

    def add_hook(self, name, func):
        """Attach a callback to a hook. Three hooks are currently implemented:

        before_request
            Executed once before each request. The request context is
            available, but no routing has happened yet.
        after_request
            Executed once after each request regardless of its outcome.
        app_reset
            Called whenever :meth:`Bottle.reset` is called.
        """
        if name in self.__hook_reversed:
            self._hooks[name].insert(0, func)
        else:
            self._hooks[name].append(func)

    def remove_hook(self, name, func):
        """Remove a callback from a hook."""
        if name in self._hooks and func in self._hooks[name]:
            self._hooks[name].remove(func)
            return True

    def trigger_hook(self, __name, *args, **kwargs):
        """Trigger a hook and return a list of results."""
        return [hook(*args, **kwargs) for hook in self._hooks[__name][:]]

    def hook(self, name):
        """Return a decorator that attaches a callback to a hook. See
        :meth:`add_hook` for details."""

        def decorator(func):
            self.add_hook(name, func)
            return func

        return decorator

    def _mount_wsgi(self, prefix, app, **options):
        segments = [p for p in prefix.split("/") if p]
        if not segments:
            raise ValueError('WSGI applications cannot be mounted to "/".')
        path_depth = len(segments)

        def mountpoint_wrapper():
            try:
                request.path_shift(path_depth)
                rs = HTTPResponse([])

                def start_response(status, headerlist, exc_info=None):
                    if exc_info:
                        _raise(*exc_info)
                    if py3k:
                        # Errors here mean that the mounted WSGI app did not
                        # follow PEP-3333 (which requires latin1) or used a
                        # pre-encoding other than utf8 :/
                        status = status.encode("latin1").decode("utf8")
                        headerlist = [
                            (k, v.encode("latin1").decode("utf8"))
                            for (k, v) in headerlist
                        ]
                    rs.status = status
                    for name, value in headerlist:
                        rs.add_header(name, value)
                    return rs.body.append

                body = app(request.environ, start_response)
                rs.body = itertools.chain(rs.body, body) if rs.body else body
                return rs
            finally:
                request.path_shift(-path_depth)

        options.setdefault("skip", True)
        options.setdefault("method", "PROXY")
        options.setdefault("mountpoint", {"prefix": prefix, "target": app})
        options["callback"] = mountpoint_wrapper

        self.route("/%s/<:re:.*>" % "/".join(segments), **options)
        if not prefix.endswith("/"):
            self.route("/" + "/".join(segments), **options)

    def _mount_app(self, prefix, app, **options):
        if app in self._mounts or "_mount.app" in app.config:
            depr(
                0,
                13,
                "Application mounted multiple times. Falling back to WSGI mount.",
                "Clone application before mounting to a different location.",
            )
            return self._mount_wsgi(prefix, app, **options)

        if options:
            depr(
                0,
                13,
                "Unsupported mount options. Falling back to WSGI mount.",
                "Do not specify any route options when mounting bottle application.",
            )
            return self._mount_wsgi(prefix, app, **options)

        if not prefix.endswith("/"):
            depr(
                0,
                13,
                "Prefix must end in '/'. Falling back to WSGI mount.",
                "Consider adding an explicit redirect from '/prefix' to '/prefix/' in the parent application.",
            )
            return self._mount_wsgi(prefix, app, **options)

        self._mounts.append(app)
        app.config["_mount.prefix"] = prefix
        app.config["_mount.app"] = self
        for route in app.routes:
            route.rule = prefix + route.rule.lstrip("/")
            self.add_route(route)

    def mount(self, prefix, app, **options):
        """Mount an application (:class:`Bottle` or plain WSGI) to a specific
        URL prefix. Example::

            parent_app.mount('/prefix/', child_app)

        :param prefix: path prefix or `mount-point`.
        :param app: an instance of :class:`Bottle` or a WSGI application.

        Plugins from the parent application are not applied to the routes
        of the mounted child application. If you need plugins in the child
        application, install them separately.

        While it is possible to use path wildcards within the prefix path
        (:class:`Bottle` childs only), it is highly discouraged.

        The prefix path must end with a slash. If you want to access the
        root of the child application via `/prefix` in addition to
        `/prefix/`, consider adding a route with a 307 redirect to the
        parent application.
        """

        if not prefix.startswith("/"):
            raise ValueError("Prefix must start with '/'")

        if isinstance(app, Bottle):
            return self._mount_app(prefix, app, **options)
        else:
            return self._mount_wsgi(prefix, app, **options)

    def merge(self, routes):
        """Merge the routes of another :class:`Bottle` application or a list of
        :class:`Route` objects into this application. The routes keep their
        'owner', meaning that the :data:`Route.app` attribute is not
        changed."""
        if isinstance(routes, Bottle):
            routes = routes.routes
        for route in routes:
            self.add_route(route)

    def install(self, plugin):
        """Add a plugin to the list of plugins and prepare it for being
        applied to all routes of this application. A plugin may be a simple
        decorator or an object that implements the :class:`Plugin` API.
        """
        if hasattr(plugin, "setup"):
            plugin.setup(self)
        if not callable(plugin) and not hasattr(plugin, "apply"):
            raise TypeError("Plugins must be callable or implement .apply()")
        self.plugins.append(plugin)
        self.reset()
        return plugin

    def uninstall(self, plugin):
        """Uninstall plugins. Pass an instance to remove a specific plugin, a type
        object to remove all plugins that match that type, a string to remove
        all plugins with a matching ``name`` attribute or ``True`` to remove all
        plugins. Return the list of removed plugins."""
        removed, remove = [], plugin
        for i, plugin in list(enumerate(self.plugins))[::-1]:
            if (
                remove is True
                or remove is plugin
                or remove is type(plugin)
                or getattr(plugin, "name", True) == remove
            ):
                removed.append(plugin)
                del self.plugins[i]
                if hasattr(plugin, "close"):
                    plugin.close()
        if removed:
            self.reset()
        return removed

    def reset(self, route=None):
        """Reset all routes (force plugins to be re-applied) and clear all
        caches. If an ID or route object is given, only that specific route
        is affected."""
        if route is None:
            routes = self.routes
        elif isinstance(route, Route):
            routes = [route]
        else:
            routes = [self.routes[route]]
        for route in routes:
            route.reset()
        if DEBUG:
            for route in routes:
                route.prepare()
        self.trigger_hook("app_reset")

    def close(self):
        """Close the application and all installed plugins."""
        for plugin in self.plugins:
            if hasattr(plugin, "close"):
                plugin.close()

    def run(self, **kwargs):
        """Calls :func:`run` with the same parameters."""
        run(self, **kwargs)

    def match(self, environ):
        """Search for a matching route and return a (:class:`Route`, urlargs)
        tuple. The second value is a dictionary with parameters extracted
        from the URL. Raise :exc:`HTTPError` (404/405) on a non-match."""
        return self.router.match(environ)

    def get_url(self, routename, **kargs):
        """Return a string that matches a named route"""
        scriptname = request.environ.get("SCRIPT_NAME", "").strip("/") + "/"
        location = self.router.build(routename, **kargs).lstrip("/")
        return urljoin(urljoin("/", scriptname), location)

    def add_route(self, route):
        """Add a route object, but do not change the :data:`Route.app`
        attribute."""
        self.routes.append(route)
        self.router.add(route.rule, route.method, route, name=route.name)
        if DEBUG:
            route.prepare()

    def route(
        self,
        path=None,
        method="GET",
        callback=None,
        name=None,
        apply=None,
        skip=None,
        **config
    ):
        """A decorator to bind a function to a request URL. Example::

            @app.route('/hello/<name>')
            def hello(name):
                return 'Hello %s' % name

        The ``<name>`` part is a wildcard. See :class:`Router` for syntax
        details.

        :param path: Request path or a list of paths to listen to. If no
          path is specified, it is automatically generated from the
          signature of the function.
        :param method: HTTP method (`GET`, `POST`, `PUT`, ...) or a list of
          methods to listen to. (default: `GET`)
        :param callback: An optional shortcut to avoid the decorator
          syntax. ``route(..., callback=func)`` equals ``route(...)(func)``
        :param name: The name for this route. (default: None)
        :param apply: A decorator or plugin or a list of plugins. These are
          applied to the route callback in addition to installed plugins.
        :param skip: A list of plugins, plugin classes or names. Matching
          plugins are not installed to this route. ``True`` skips all.

        Any additional keyword arguments are stored as route-specific
        configuration and passed to plugins (see :meth:`Plugin.apply`).
        """
        if callable(path):
            path, callback = None, path
        plugins = makelist(apply)
        skiplist = makelist(skip)

        def decorator(callback):
            if isinstance(callback, basestring):
                callback = load(callback)
            for rule in makelist(path) or yieldroutes(callback):
                for verb in makelist(method):
                    verb = verb.upper()
                    route = Route(
                        self,
                        rule,
                        verb,
                        callback,
                        name=name,
                        plugins=plugins,
                        skiplist=skiplist,
                        **config
                    )
                    self.add_route(route)
            return callback

        return decorator(callback) if callback else decorator

    def get(self, path=None, method="GET", **options):
        """Equals :meth:`route`."""
        return self.route(path, method, **options)

    def post(self, path=None, method="POST", **options):
        """Equals :meth:`route` with a ``POST`` method parameter."""
        return self.route(path, method, **options)

    def put(self, path=None, method="PUT", **options):
        """Equals :meth:`route` with a ``PUT`` method parameter."""
        return self.route(path, method, **options)

    def delete(self, path=None, method="DELETE", **options):
        """Equals :meth:`route` with a ``DELETE`` method parameter."""
        return self.route(path, method, **options)

    def patch(self, path=None, method="PATCH", **options):
        """Equals :meth:`route` with a ``PATCH`` method parameter."""
        return self.route(path, method, **options)

    def error(self, code=500, callback=None):
        """Register an output handler for a HTTP error code. Can
        be used as a decorator or called directly ::

            def error_handler_500(error):
                return 'error_handler_500'

            app.error(code=500, callback=error_handler_500)

            @app.error(404)
            def error_handler_404(error):
                return 'error_handler_404'

        """

        def decorator(callback):
            if isinstance(callback, basestring):
                callback = load(callback)
            self.error_handler[int(code)] = callback
            return callback

        return decorator(callback) if callback else decorator

    def default_error_handler(self, res):
        return tob(
            template(
                ERROR_PAGE_TEMPLATE,
                e=res,
                template_settings=dict(name="__ERROR_PAGE_TEMPLATE"),
            )
        )

    def _handle(self, environ):
        path = environ["bottle.raw_path"] = environ["PATH_INFO"]
        if py3k:
            environ["PATH_INFO"] = path.encode("latin1").decode("utf8", "ignore")

        environ["bottle.app"] = self
        request.bind(environ)
        response.bind()

        try:
            out = None
            try:
                self.trigger_hook("before_request")
                route, args = self.router.match(environ)
                environ["route.handle"] = route
                environ["bottle.route"] = route
                environ["route.url_args"] = args
                out = route.call(**args)
            except HTTPResponse as E:
                out = E
            finally:
                if isinstance(out, HTTPResponse):
                    out.apply(response)
                try:
                    self.trigger_hook("after_request")
                except HTTPResponse as E:
                    out = E
                    out.apply(response)
        except (KeyboardInterrupt, SystemExit, MemoryError):
            raise
        except Exception as E:
            if not self.catchall:
                raise
            stacktrace = format_exc()
            environ["wsgi.errors"].write(stacktrace)
            environ["wsgi.errors"].flush()
            environ["bottle.exc_info"] = sys.exc_info()
            out = HTTPError(500, "Internal Server Error", E, stacktrace)
            out.apply(response)

        return out

    def _cast(self, out, peek=None):
        """Try to convert the parameter into something WSGI compatible and set
        correct HTTP headers when possible.
        Support: False, str, unicode, dict, HTTPResponse, HTTPError, file-like,
        iterable of strings and iterable of unicodes
        """

        # Empty output is done here
        if not out:
            if "Content-Length" not in response:
                response["Content-Length"] = 0
            return []
        # Join lists of byte or unicode strings. Mixed lists are NOT supported
        if isinstance(out, (tuple, list)) and isinstance(out[0], (bytes, unicode)):
            out = out[0][0:0].join(out)  # b'abc'[0:0] -> b''
        # Encode unicode strings
        if isinstance(out, unicode):
            out = out.encode(response.charset)
        # Byte Strings are just returned
        if isinstance(out, bytes):
            if "Content-Length" not in response:
                response["Content-Length"] = len(out)
            return [out]
        # HTTPError or HTTPException (recursive, because they may wrap anything)
        # TODO: Handle these explicitly in handle() or make them iterable.
        if isinstance(out, HTTPError):
            out.apply(response)
            out = self.error_handler.get(out.status_code, self.default_error_handler)(
                out
            )
            return self._cast(out)
        if isinstance(out, HTTPResponse):
            out.apply(response)
            return self._cast(out.body)

        # File-like objects.
        if hasattr(out, "read"):
            if "wsgi.file_wrapper" in request.environ:
                return request.environ["wsgi.file_wrapper"](out)
            elif hasattr(out, "close") or not hasattr(out, "__iter__"):
                return WSGIFileWrapper(out)

        # Handle Iterables. We peek into them to detect their inner type.
        try:
            iout = iter(out)
            first = next(iout)
            while not first:
                first = next(iout)
        except StopIteration:
            return self._cast("")
        except HTTPResponse as E:
            first = E
        except (KeyboardInterrupt, SystemExit, MemoryError):
            raise
        except Exception as error:
            if not self.catchall:
                raise
            first = HTTPError(500, "Unhandled exception", error, format_exc())

        # These are the inner types allowed in iterator or generator objects.
        if isinstance(first, HTTPResponse):
            return self._cast(first)
        elif isinstance(first, bytes):
            new_iter = itertools.chain([first], iout)
        elif isinstance(first, unicode):
            encoder = lambda x: x.encode(response.charset)
            new_iter = imap(encoder, itertools.chain([first], iout))
        else:
            msg = "Unsupported response type: %s" % type(first)
            return self._cast(HTTPError(500, msg))
        if hasattr(out, "close"):
            new_iter = _closeiter(new_iter, out.close)
        return new_iter

    def wsgi(self, environ, start_response):
        """The bottle WSGI-interface."""
        try:
            out = self._cast(self._handle(environ))
            # rfc2616 section 4.3
            if (
                response._status_code in (100, 101, 204, 304)
                or environ["REQUEST_METHOD"] == "HEAD"
            ):
                if hasattr(out, "close"):
                    out.close()
                out = []
            exc_info = environ.get("bottle.exc_info")
            if exc_info is not None:
                del environ["bottle.exc_info"]
            start_response(response._wsgi_status_line(), response.headerlist, exc_info)
            return out
        except (KeyboardInterrupt, SystemExit, MemoryError):
            raise
        except Exception as E:
            if not self.catchall:
                raise
            err = "<h1>Critical error while processing request: %s</h1>" % html_escape(
                environ.get("PATH_INFO", "/")
            )
            if DEBUG:
                err += (
                    "<h2>Error:</h2>\n<pre>\n%s\n</pre>\n"
                    "<h2>Traceback:</h2>\n<pre>\n%s\n</pre>\n"
                    % (html_escape(repr(E)), html_escape(format_exc()))
                )
            environ["wsgi.errors"].write(err)
            environ["wsgi.errors"].flush()
            headers = [("Content-Type", "text/html; charset=UTF-8")]
            start_response("500 INTERNAL SERVER ERROR", headers, sys.exc_info())
            return [tob(err)]

    def __call__(self, environ, start_response):
        """Each instance of :class:'Bottle' is a WSGI application."""
        return self.wsgi(environ, start_response)

    def __enter__(self):
        """Use this application as default for all module-level shortcuts."""
        default_app.push(self)
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        default_app.pop()

    def __setattr__(self, name, value):
        if name in self.__dict__:
            raise AttributeError(
                "Attribute %s already defined. Plugin conflict?" % name
            )
        object.__setattr__(self, name, value)

__call__(environ, start_response)

Each instance of :class:'Bottle' is a WSGI application.

Source code in src/sgn/bottle.py
def __call__(self, environ, start_response):
    """Each instance of :class:'Bottle' is a WSGI application."""
    return self.wsgi(environ, start_response)

__enter__()

Use this application as default for all module-level shortcuts.

Source code in src/sgn/bottle.py
def __enter__(self):
    """Use this application as default for all module-level shortcuts."""
    default_app.push(self)
    return self

add_hook(name, func)

Attach a callback to a hook. Three hooks are currently implemented:

before_request Executed once before each request. The request context is available, but no routing has happened yet. after_request Executed once after each request regardless of its outcome. app_reset Called whenever :meth:Bottle.reset is called.

Source code in src/sgn/bottle.py
def add_hook(self, name, func):
    """Attach a callback to a hook. Three hooks are currently implemented:

    before_request
        Executed once before each request. The request context is
        available, but no routing has happened yet.
    after_request
        Executed once after each request regardless of its outcome.
    app_reset
        Called whenever :meth:`Bottle.reset` is called.
    """
    if name in self.__hook_reversed:
        self._hooks[name].insert(0, func)
    else:
        self._hooks[name].append(func)

add_route(route)

Add a route object, but do not change the :data:Route.app attribute.

Source code in src/sgn/bottle.py
def add_route(self, route):
    """Add a route object, but do not change the :data:`Route.app`
    attribute."""
    self.routes.append(route)
    self.router.add(route.rule, route.method, route, name=route.name)
    if DEBUG:
        route.prepare()

close()

Close the application and all installed plugins.

Source code in src/sgn/bottle.py
def close(self):
    """Close the application and all installed plugins."""
    for plugin in self.plugins:
        if hasattr(plugin, "close"):
            plugin.close()

delete(path=None, method='DELETE', **options)

Equals :meth:route with a DELETE method parameter.

Source code in src/sgn/bottle.py
def delete(self, path=None, method="DELETE", **options):
    """Equals :meth:`route` with a ``DELETE`` method parameter."""
    return self.route(path, method, **options)

error(code=500, callback=None)

Register an output handler for a HTTP error code. Can be used as a decorator or called directly ::

def error_handler_500(error):
    return 'error_handler_500'

app.error(code=500, callback=error_handler_500)

@app.error(404)
def error_handler_404(error):
    return 'error_handler_404'
Source code in src/sgn/bottle.py
def error(self, code=500, callback=None):
    """Register an output handler for a HTTP error code. Can
    be used as a decorator or called directly ::

        def error_handler_500(error):
            return 'error_handler_500'

        app.error(code=500, callback=error_handler_500)

        @app.error(404)
        def error_handler_404(error):
            return 'error_handler_404'

    """

    def decorator(callback):
        if isinstance(callback, basestring):
            callback = load(callback)
        self.error_handler[int(code)] = callback
        return callback

    return decorator(callback) if callback else decorator

get(path=None, method='GET', **options)

Equals :meth:route.

Source code in src/sgn/bottle.py
def get(self, path=None, method="GET", **options):
    """Equals :meth:`route`."""
    return self.route(path, method, **options)

get_url(routename, **kargs)

Return a string that matches a named route

Source code in src/sgn/bottle.py
def get_url(self, routename, **kargs):
    """Return a string that matches a named route"""
    scriptname = request.environ.get("SCRIPT_NAME", "").strip("/") + "/"
    location = self.router.build(routename, **kargs).lstrip("/")
    return urljoin(urljoin("/", scriptname), location)

hook(name)

Return a decorator that attaches a callback to a hook. See :meth:add_hook for details.

Source code in src/sgn/bottle.py
def hook(self, name):
    """Return a decorator that attaches a callback to a hook. See
    :meth:`add_hook` for details."""

    def decorator(func):
        self.add_hook(name, func)
        return func

    return decorator

install(plugin)

Add a plugin to the list of plugins and prepare it for being applied to all routes of this application. A plugin may be a simple decorator or an object that implements the :class:Plugin API.

Source code in src/sgn/bottle.py
def install(self, plugin):
    """Add a plugin to the list of plugins and prepare it for being
    applied to all routes of this application. A plugin may be a simple
    decorator or an object that implements the :class:`Plugin` API.
    """
    if hasattr(plugin, "setup"):
        plugin.setup(self)
    if not callable(plugin) and not hasattr(plugin, "apply"):
        raise TypeError("Plugins must be callable or implement .apply()")
    self.plugins.append(plugin)
    self.reset()
    return plugin

match(environ)

Search for a matching route and return a (:class:Route, urlargs) tuple. The second value is a dictionary with parameters extracted from the URL. Raise :exc:HTTPError (404/405) on a non-match.

Source code in src/sgn/bottle.py
def match(self, environ):
    """Search for a matching route and return a (:class:`Route`, urlargs)
    tuple. The second value is a dictionary with parameters extracted
    from the URL. Raise :exc:`HTTPError` (404/405) on a non-match."""
    return self.router.match(environ)

merge(routes)

Merge the routes of another :class:Bottle application or a list of :class:Route objects into this application. The routes keep their 'owner', meaning that the :data:Route.app attribute is not changed.

Source code in src/sgn/bottle.py
def merge(self, routes):
    """Merge the routes of another :class:`Bottle` application or a list of
    :class:`Route` objects into this application. The routes keep their
    'owner', meaning that the :data:`Route.app` attribute is not
    changed."""
    if isinstance(routes, Bottle):
        routes = routes.routes
    for route in routes:
        self.add_route(route)

mount(prefix, app, **options)

Mount an application (:class:Bottle or plain WSGI) to a specific URL prefix. Example::

parent_app.mount('/prefix/', child_app)

:param prefix: path prefix or mount-point. :param app: an instance of :class:Bottle or a WSGI application.

Plugins from the parent application are not applied to the routes of the mounted child application. If you need plugins in the child application, install them separately.

While it is possible to use path wildcards within the prefix path (:class:Bottle childs only), it is highly discouraged.

The prefix path must end with a slash. If you want to access the root of the child application via /prefix in addition to /prefix/, consider adding a route with a 307 redirect to the parent application.

Source code in src/sgn/bottle.py
def mount(self, prefix, app, **options):
    """Mount an application (:class:`Bottle` or plain WSGI) to a specific
    URL prefix. Example::

        parent_app.mount('/prefix/', child_app)

    :param prefix: path prefix or `mount-point`.
    :param app: an instance of :class:`Bottle` or a WSGI application.

    Plugins from the parent application are not applied to the routes
    of the mounted child application. If you need plugins in the child
    application, install them separately.

    While it is possible to use path wildcards within the prefix path
    (:class:`Bottle` childs only), it is highly discouraged.

    The prefix path must end with a slash. If you want to access the
    root of the child application via `/prefix` in addition to
    `/prefix/`, consider adding a route with a 307 redirect to the
    parent application.
    """

    if not prefix.startswith("/"):
        raise ValueError("Prefix must start with '/'")

    if isinstance(app, Bottle):
        return self._mount_app(prefix, app, **options)
    else:
        return self._mount_wsgi(prefix, app, **options)

patch(path=None, method='PATCH', **options)

Equals :meth:route with a PATCH method parameter.

Source code in src/sgn/bottle.py
def patch(self, path=None, method="PATCH", **options):
    """Equals :meth:`route` with a ``PATCH`` method parameter."""
    return self.route(path, method, **options)

post(path=None, method='POST', **options)

Equals :meth:route with a POST method parameter.

Source code in src/sgn/bottle.py
def post(self, path=None, method="POST", **options):
    """Equals :meth:`route` with a ``POST`` method parameter."""
    return self.route(path, method, **options)

put(path=None, method='PUT', **options)

Equals :meth:route with a PUT method parameter.

Source code in src/sgn/bottle.py
def put(self, path=None, method="PUT", **options):
    """Equals :meth:`route` with a ``PUT`` method parameter."""
    return self.route(path, method, **options)

remove_hook(name, func)

Remove a callback from a hook.

Source code in src/sgn/bottle.py
def remove_hook(self, name, func):
    """Remove a callback from a hook."""
    if name in self._hooks and func in self._hooks[name]:
        self._hooks[name].remove(func)
        return True

reset(route=None)

Reset all routes (force plugins to be re-applied) and clear all caches. If an ID or route object is given, only that specific route is affected.

Source code in src/sgn/bottle.py
def reset(self, route=None):
    """Reset all routes (force plugins to be re-applied) and clear all
    caches. If an ID or route object is given, only that specific route
    is affected."""
    if route is None:
        routes = self.routes
    elif isinstance(route, Route):
        routes = [route]
    else:
        routes = [self.routes[route]]
    for route in routes:
        route.reset()
    if DEBUG:
        for route in routes:
            route.prepare()
    self.trigger_hook("app_reset")

route(path=None, method='GET', callback=None, name=None, apply=None, skip=None, **config)

A decorator to bind a function to a request URL. Example::

@app.route('/hello/<name>')
def hello(name):
    return 'Hello %s' % name

The <name> part is a wildcard. See :class:Router for syntax details.

:param path: Request path or a list of paths to listen to. If no path is specified, it is automatically generated from the signature of the function. :param method: HTTP method (GET, POST, PUT, ...) or a list of methods to listen to. (default: GET) :param callback: An optional shortcut to avoid the decorator syntax. route(..., callback=func) equals route(...)(func) :param name: The name for this route. (default: None) :param apply: A decorator or plugin or a list of plugins. These are applied to the route callback in addition to installed plugins. :param skip: A list of plugins, plugin classes or names. Matching plugins are not installed to this route. True skips all.

Any additional keyword arguments are stored as route-specific configuration and passed to plugins (see :meth:Plugin.apply).

Source code in src/sgn/bottle.py
def route(
    self,
    path=None,
    method="GET",
    callback=None,
    name=None,
    apply=None,
    skip=None,
    **config
):
    """A decorator to bind a function to a request URL. Example::

        @app.route('/hello/<name>')
        def hello(name):
            return 'Hello %s' % name

    The ``<name>`` part is a wildcard. See :class:`Router` for syntax
    details.

    :param path: Request path or a list of paths to listen to. If no
      path is specified, it is automatically generated from the
      signature of the function.
    :param method: HTTP method (`GET`, `POST`, `PUT`, ...) or a list of
      methods to listen to. (default: `GET`)
    :param callback: An optional shortcut to avoid the decorator
      syntax. ``route(..., callback=func)`` equals ``route(...)(func)``
    :param name: The name for this route. (default: None)
    :param apply: A decorator or plugin or a list of plugins. These are
      applied to the route callback in addition to installed plugins.
    :param skip: A list of plugins, plugin classes or names. Matching
      plugins are not installed to this route. ``True`` skips all.

    Any additional keyword arguments are stored as route-specific
    configuration and passed to plugins (see :meth:`Plugin.apply`).
    """
    if callable(path):
        path, callback = None, path
    plugins = makelist(apply)
    skiplist = makelist(skip)

    def decorator(callback):
        if isinstance(callback, basestring):
            callback = load(callback)
        for rule in makelist(path) or yieldroutes(callback):
            for verb in makelist(method):
                verb = verb.upper()
                route = Route(
                    self,
                    rule,
                    verb,
                    callback,
                    name=name,
                    plugins=plugins,
                    skiplist=skiplist,
                    **config
                )
                self.add_route(route)
        return callback

    return decorator(callback) if callback else decorator

run(**kwargs)

Calls :func:run with the same parameters.

Source code in src/sgn/bottle.py
def run(self, **kwargs):
    """Calls :func:`run` with the same parameters."""
    run(self, **kwargs)

trigger_hook(__name, *args, **kwargs)

Trigger a hook and return a list of results.

Source code in src/sgn/bottle.py
def trigger_hook(self, __name, *args, **kwargs):
    """Trigger a hook and return a list of results."""
    return [hook(*args, **kwargs) for hook in self._hooks[__name][:]]

uninstall(plugin)

Uninstall plugins. Pass an instance to remove a specific plugin, a type object to remove all plugins that match that type, a string to remove all plugins with a matching name attribute or True to remove all plugins. Return the list of removed plugins.

Source code in src/sgn/bottle.py
def uninstall(self, plugin):
    """Uninstall plugins. Pass an instance to remove a specific plugin, a type
    object to remove all plugins that match that type, a string to remove
    all plugins with a matching ``name`` attribute or ``True`` to remove all
    plugins. Return the list of removed plugins."""
    removed, remove = [], plugin
    for i, plugin in list(enumerate(self.plugins))[::-1]:
        if (
            remove is True
            or remove is plugin
            or remove is type(plugin)
            or getattr(plugin, "name", True) == remove
        ):
            removed.append(plugin)
            del self.plugins[i]
            if hasattr(plugin, "close"):
                plugin.close()
    if removed:
        self.reset()
    return removed

wsgi(environ, start_response)

The bottle WSGI-interface.

Source code in src/sgn/bottle.py
def wsgi(self, environ, start_response):
    """The bottle WSGI-interface."""
    try:
        out = self._cast(self._handle(environ))
        # rfc2616 section 4.3
        if (
            response._status_code in (100, 101, 204, 304)
            or environ["REQUEST_METHOD"] == "HEAD"
        ):
            if hasattr(out, "close"):
                out.close()
            out = []
        exc_info = environ.get("bottle.exc_info")
        if exc_info is not None:
            del environ["bottle.exc_info"]
        start_response(response._wsgi_status_line(), response.headerlist, exc_info)
        return out
    except (KeyboardInterrupt, SystemExit, MemoryError):
        raise
    except Exception as E:
        if not self.catchall:
            raise
        err = "<h1>Critical error while processing request: %s</h1>" % html_escape(
            environ.get("PATH_INFO", "/")
        )
        if DEBUG:
            err += (
                "<h2>Error:</h2>\n<pre>\n%s\n</pre>\n"
                "<h2>Traceback:</h2>\n<pre>\n%s\n</pre>\n"
                % (html_escape(repr(E)), html_escape(format_exc()))
            )
        environ["wsgi.errors"].write(err)
        environ["wsgi.errors"].flush()
        headers = [("Content-Type", "text/html; charset=UTF-8")]
        start_response("500 INTERNAL SERVER ERROR", headers, sys.exc_info())
        return [tob(err)]

BottleException

Bases: Exception


              flowchart TD
              sgn.bottle.BottleException[BottleException]

              

              click sgn.bottle.BottleException href "" "sgn.bottle.BottleException"
            

A base class for exceptions used by bottle.

Source code in src/sgn/bottle.py
class BottleException(Exception):
    """A base class for exceptions used by bottle."""

    pass

ConfigDict

Bases: dict


              flowchart TD
              sgn.bottle.ConfigDict[ConfigDict]

              

              click sgn.bottle.ConfigDict href "" "sgn.bottle.ConfigDict"
            

A dict-like configuration storage with additional support for namespaces, validators, meta-data and overlays.

This dict-like class is heavily optimized for read access. Read-only methods and item access should be as fast as a native dict.

Source code in src/sgn/bottle.py
class ConfigDict(dict):
    """A dict-like configuration storage with additional support for
    namespaces, validators, meta-data and overlays.

    This dict-like class is heavily optimized for read access.
    Read-only methods and item access should be as fast as a native dict.
    """

    __slots__ = (
        "_meta",
        "_change_listener",
        "_overlays",
        "_virtual_keys",
        "_source",
        "__weakref__",
    )

    def __init__(self):
        self._meta = {}
        self._change_listener = []
        #: Weak references of overlays that need to be kept in sync.
        self._overlays = []
        #: Config that is the source for this overlay.
        self._source = None
        #: Keys of values copied from the source (values we do not own)
        self._virtual_keys = set()

    def load_module(self, name, squash=True):
        """Load values from a Python module.

        Import a python module by name and add all upper-case module-level
        variables to this config dict.

        :param name: Module name to import and load.
        :param squash: If true (default), nested dicts are assumed to
           represent namespaces and flattened (see :meth:`load_dict`).
        """
        config_obj = load(name)
        obj = {
            key: getattr(config_obj, key) for key in dir(config_obj) if key.isupper()
        }

        if squash:
            self.load_dict(obj)
        else:
            self.update(obj)
        return self

    def load_config(self, filename, **options):
        """Load values from ``*.ini`` style config files using configparser.

        INI style sections (e.g. ``[section]``) are used as namespace for
        all keys within that section. Both section and key names may contain
        dots as namespace separators and are converted to lower-case.

        The special sections ``[bottle]`` and ``[ROOT]`` refer to the root
        namespace and the ``[DEFAULT]`` section defines default values for all
        other sections.

        :param filename: The path of a config file, or a list of paths.
        :param options: All keyword parameters are passed to the underlying
            :class:`python:configparser.ConfigParser` constructor call.

        """
        options.setdefault("allow_no_value", True)
        if py3k:
            options.setdefault("interpolation", configparser.ExtendedInterpolation())
        conf = configparser.ConfigParser(**options)
        conf.read(filename)
        for section in conf.sections():
            for key in conf.options(section):
                value = conf.get(section, key)
                if section not in ("bottle", "ROOT"):
                    key = section + "." + key
                self[key.lower()] = value
        return self

    def load_dict(self, source, namespace=""):
        """Load values from a dictionary structure. Nesting can be used to
        represent namespaces.

        >>> c = ConfigDict()
        >>> c.load_dict({'some': {'namespace': {'key': 'value'} } })
        {'some.namespace.key': 'value'}
        """
        for key, value in source.items():
            if isinstance(key, basestring):
                nskey = (namespace + "." + key).strip(".")
                if isinstance(value, dict):
                    self.load_dict(value, namespace=nskey)
                else:
                    self[nskey] = value
            else:
                raise TypeError("Key has type %r (not a string)" % type(key))
        return self

    def update(self, *a, **ka):
        """If the first parameter is a string, all keys are prefixed with this
        namespace. Apart from that it works just as the usual dict.update().

        >>> c = ConfigDict()
        >>> c.update('some.namespace', key='value')
        """
        prefix = ""
        if a and isinstance(a[0], basestring):
            prefix = a[0].strip(".") + "."
            a = a[1:]
        for key, value in dict(*a, **ka).items():
            self[prefix + key] = value

    def setdefault(self, key, value=None):
        if key not in self:
            self[key] = value
        return self[key]

    def __setitem__(self, key, value):
        if not isinstance(key, basestring):
            raise TypeError("Key has type %r (not a string)" % type(key))

        self._virtual_keys.discard(key)

        value = self.meta_get(key, "filter", lambda x: x)(value)
        if key in self and self[key] is value:
            return

        self._on_change(key, value)
        dict.__setitem__(self, key, value)

        for overlay in self._iter_overlays():
            overlay._set_virtual(key, value)

    def __delitem__(self, key):
        if key not in self:
            raise KeyError(key)
        if key in self._virtual_keys:
            raise KeyError("Virtual keys cannot be deleted: %s" % key)

        if self._source and key in self._source:
            # Not virtual, but present in source -> Restore virtual value
            dict.__delitem__(self, key)
            self._set_virtual(key, self._source[key])
        else:  # not virtual, not present in source. This is OUR value
            self._on_change(key, None)
            dict.__delitem__(self, key)
            for overlay in self._iter_overlays():
                overlay._delete_virtual(key)

    def _set_virtual(self, key, value):
        """Recursively set or update virtual keys."""
        if key in self and key not in self._virtual_keys:
            return  # Do nothing for non-virtual keys.

        self._virtual_keys.add(key)
        if key in self and self[key] is not value:
            self._on_change(key, value)
        dict.__setitem__(self, key, value)
        for overlay in self._iter_overlays():
            overlay._set_virtual(key, value)

    def _delete_virtual(self, key):
        """Recursively delete virtual entry."""
        if key not in self._virtual_keys:
            return  # Do nothing for non-virtual keys.

        if key in self:
            self._on_change(key, None)
        dict.__delitem__(self, key)
        self._virtual_keys.discard(key)
        for overlay in self._iter_overlays():
            overlay._delete_virtual(key)

    def _on_change(self, key, value):
        for cb in self._change_listener:
            if cb(self, key, value):
                return True

    def _add_change_listener(self, func):
        self._change_listener.append(func)
        return func

    def meta_get(self, key, metafield, default=None):
        """Return the value of a meta field for a key."""
        return self._meta.get(key, {}).get(metafield, default)

    def meta_set(self, key, metafield, value):
        """Set the meta field for a key to a new value.

        Meta-fields are shared between all members of an overlay tree.
        """
        self._meta.setdefault(key, {})[metafield] = value

    def meta_list(self, key):
        """Return an iterable of meta field names defined for a key."""
        return self._meta.get(key, {}).keys()

    def _define(self, key, default=_UNSET, help=_UNSET, validate=_UNSET):
        """(Unstable) Shortcut for plugins to define own config parameters."""
        if default is not _UNSET:
            self.setdefault(key, default)
        if help is not _UNSET:
            self.meta_set(key, "help", help)
        if validate is not _UNSET:
            self.meta_set(key, "validate", validate)

    def _iter_overlays(self):
        for ref in self._overlays:
            overlay = ref()
            if overlay is not None:
                yield overlay

    def _make_overlay(self):
        """(Unstable) Create a new overlay that acts like a chained map: Values
        missing in the overlay are copied from the source map. Both maps
        share the same meta entries.

        Entries that were copied from the source are called 'virtual'. You
        can not delete virtual keys, but overwrite them, which turns them
        into non-virtual entries. Setting keys on an overlay never affects
        its source, but may affect any number of child overlays.

        Other than collections.ChainMap or most other implementations, this
        approach does not resolve missing keys on demand, but instead
        actively copies all values from the source to the overlay and keeps
        track of virtual and non-virtual keys internally. This removes any
        lookup-overhead. Read-access is as fast as a build-in dict for both
        virtual and non-virtual keys.

        Changes are propagated recursively and depth-first. A failing
        on-change handler in an overlay stops the propagation of virtual
        values and may result in an partly updated tree. Take extra care
        here and make sure that on-change handlers never fail.

        Used by Route.config
        """
        # Cleanup dead references
        self._overlays[:] = [ref for ref in self._overlays if ref() is not None]

        overlay = ConfigDict()
        overlay._meta = self._meta
        overlay._source = self
        self._overlays.append(weakref.ref(overlay))
        for key in self:
            overlay._set_virtual(key, self[key])
        return overlay

load_config(filename, **options)

Load values from *.ini style config files using configparser.

INI style sections (e.g. [section]) are used as namespace for all keys within that section. Both section and key names may contain dots as namespace separators and are converted to lower-case.

The special sections [bottle] and [ROOT] refer to the root namespace and the [DEFAULT] section defines default values for all other sections.

:param filename: The path of a config file, or a list of paths. :param options: All keyword parameters are passed to the underlying :class:python:configparser.ConfigParser constructor call.

Source code in src/sgn/bottle.py
def load_config(self, filename, **options):
    """Load values from ``*.ini`` style config files using configparser.

    INI style sections (e.g. ``[section]``) are used as namespace for
    all keys within that section. Both section and key names may contain
    dots as namespace separators and are converted to lower-case.

    The special sections ``[bottle]`` and ``[ROOT]`` refer to the root
    namespace and the ``[DEFAULT]`` section defines default values for all
    other sections.

    :param filename: The path of a config file, or a list of paths.
    :param options: All keyword parameters are passed to the underlying
        :class:`python:configparser.ConfigParser` constructor call.

    """
    options.setdefault("allow_no_value", True)
    if py3k:
        options.setdefault("interpolation", configparser.ExtendedInterpolation())
    conf = configparser.ConfigParser(**options)
    conf.read(filename)
    for section in conf.sections():
        for key in conf.options(section):
            value = conf.get(section, key)
            if section not in ("bottle", "ROOT"):
                key = section + "." + key
            self[key.lower()] = value
    return self

load_dict(source, namespace='')

Load values from a dictionary structure. Nesting can be used to represent namespaces.

c = ConfigDict() c.load_dict({'some': {'namespace': {'key': 'value'} } }) {'some.namespace.key': 'value'}

Source code in src/sgn/bottle.py
def load_dict(self, source, namespace=""):
    """Load values from a dictionary structure. Nesting can be used to
    represent namespaces.

    >>> c = ConfigDict()
    >>> c.load_dict({'some': {'namespace': {'key': 'value'} } })
    {'some.namespace.key': 'value'}
    """
    for key, value in source.items():
        if isinstance(key, basestring):
            nskey = (namespace + "." + key).strip(".")
            if isinstance(value, dict):
                self.load_dict(value, namespace=nskey)
            else:
                self[nskey] = value
        else:
            raise TypeError("Key has type %r (not a string)" % type(key))
    return self

load_module(name, squash=True)

Load values from a Python module.

Import a python module by name and add all upper-case module-level variables to this config dict.

:param name: Module name to import and load. :param squash: If true (default), nested dicts are assumed to represent namespaces and flattened (see :meth:load_dict).

Source code in src/sgn/bottle.py
def load_module(self, name, squash=True):
    """Load values from a Python module.

    Import a python module by name and add all upper-case module-level
    variables to this config dict.

    :param name: Module name to import and load.
    :param squash: If true (default), nested dicts are assumed to
       represent namespaces and flattened (see :meth:`load_dict`).
    """
    config_obj = load(name)
    obj = {
        key: getattr(config_obj, key) for key in dir(config_obj) if key.isupper()
    }

    if squash:
        self.load_dict(obj)
    else:
        self.update(obj)
    return self

meta_get(key, metafield, default=None)

Return the value of a meta field for a key.

Source code in src/sgn/bottle.py
def meta_get(self, key, metafield, default=None):
    """Return the value of a meta field for a key."""
    return self._meta.get(key, {}).get(metafield, default)

meta_list(key)

Return an iterable of meta field names defined for a key.

Source code in src/sgn/bottle.py
def meta_list(self, key):
    """Return an iterable of meta field names defined for a key."""
    return self._meta.get(key, {}).keys()

meta_set(key, metafield, value)

Set the meta field for a key to a new value.

Meta-fields are shared between all members of an overlay tree.

Source code in src/sgn/bottle.py
def meta_set(self, key, metafield, value):
    """Set the meta field for a key to a new value.

    Meta-fields are shared between all members of an overlay tree.
    """
    self._meta.setdefault(key, {})[metafield] = value

update(*a, **ka)

If the first parameter is a string, all keys are prefixed with this namespace. Apart from that it works just as the usual dict.update().

c = ConfigDict() c.update('some.namespace', key='value')

Source code in src/sgn/bottle.py
def update(self, *a, **ka):
    """If the first parameter is a string, all keys are prefixed with this
    namespace. Apart from that it works just as the usual dict.update().

    >>> c = ConfigDict()
    >>> c.update('some.namespace', key='value')
    """
    prefix = ""
    if a and isinstance(a[0], basestring):
        prefix = a[0].strip(".") + "."
        a = a[1:]
    for key, value in dict(*a, **ka).items():
        self[prefix + key] = value

DictProperty

Bases: object


              flowchart TD
              sgn.bottle.DictProperty[DictProperty]

              

              click sgn.bottle.DictProperty href "" "sgn.bottle.DictProperty"
            

Property that maps to a key in a local dict-like attribute.

Source code in src/sgn/bottle.py
class DictProperty(object):
    """Property that maps to a key in a local dict-like attribute."""

    def __init__(self, attr, key=None, read_only=False):
        self.attr, self.key, self.read_only = attr, key, read_only

    def __call__(self, func):
        functools.update_wrapper(self, func, updated=[])
        self.getter, self.key = func, self.key or func.__name__
        return self

    def __get__(self, obj, cls):
        if obj is None:
            return self
        key, storage = self.key, getattr(obj, self.attr)
        if key not in storage:
            storage[key] = self.getter(obj)
        return storage[key]

    def __set__(self, obj, value):
        if self.read_only:
            raise AttributeError("Read-Only property.")
        getattr(obj, self.attr)[self.key] = value

    def __delete__(self, obj):
        if self.read_only:
            raise AttributeError("Read-Only property.")
        del getattr(obj, self.attr)[self.key]

DieselServer

Bases: ServerAdapter


              flowchart TD
              sgn.bottle.DieselServer[DieselServer]
              sgn.bottle.ServerAdapter[ServerAdapter]

                              sgn.bottle.ServerAdapter --> sgn.bottle.DieselServer
                


              click sgn.bottle.DieselServer href "" "sgn.bottle.DieselServer"
              click sgn.bottle.ServerAdapter href "" "sgn.bottle.ServerAdapter"
            

Untested.

Source code in src/sgn/bottle.py
class DieselServer(ServerAdapter):
    """Untested."""

    def run(self, handler):
        depr(0, 13, "Diesel is not tested or supported and will be removed.")
        from diesel.protocols.wsgi import WSGIApplication

        app = WSGIApplication(handler, port=self.port)
        app.run()

EventletServer

Bases: ServerAdapter


              flowchart TD
              sgn.bottle.EventletServer[EventletServer]
              sgn.bottle.ServerAdapter[ServerAdapter]

                              sgn.bottle.ServerAdapter --> sgn.bottle.EventletServer
                


              click sgn.bottle.EventletServer href "" "sgn.bottle.EventletServer"
              click sgn.bottle.ServerAdapter href "" "sgn.bottle.ServerAdapter"
            

Untested. Options:

  • backlog adjust the eventlet backlog parameter which is the maximum number of queued connections. Should be at least 1; the maximum value is system-dependent.
  • family: (default is 2) socket family, optional. See socket documentation for available families.
Source code in src/sgn/bottle.py
class EventletServer(ServerAdapter):
    """Untested. Options:

    * `backlog` adjust the eventlet backlog parameter which is the maximum
      number of queued connections. Should be at least 1; the maximum
      value is system-dependent.
    * `family`: (default is 2) socket family, optional. See socket
      documentation for available families.
    """

    def run(self, handler):
        from eventlet import listen, patcher, wsgi

        if not patcher.is_monkey_patched(os):
            msg = "Bottle requires eventlet.monkey_patch() (before import)"
            raise RuntimeError(msg)
        socket_args = {}
        for arg in ("backlog", "family"):
            try:
                socket_args[arg] = self.options.pop(arg)
            except KeyError:
                pass
        address = (self.host, self.port)
        try:
            wsgi.server(
                listen(address, **socket_args), handler, log_output=(not self.quiet)
            )
        except TypeError:
            # Fallback, if we have old version of eventlet
            wsgi.server(listen(address), handler)

FapwsServer

Bases: ServerAdapter


              flowchart TD
              sgn.bottle.FapwsServer[FapwsServer]
              sgn.bottle.ServerAdapter[ServerAdapter]

                              sgn.bottle.ServerAdapter --> sgn.bottle.FapwsServer
                


              click sgn.bottle.FapwsServer href "" "sgn.bottle.FapwsServer"
              click sgn.bottle.ServerAdapter href "" "sgn.bottle.ServerAdapter"
            

Extremely fast webserver using libev. See https://github.com/william-os4y/fapws3

Source code in src/sgn/bottle.py
class FapwsServer(ServerAdapter):
    """Extremely fast webserver using libev. See https://github.com/william-os4y/fapws3"""

    def run(self, handler):  # pragma: no cover
        depr(0, 13, "fapws3 is not maintained and support will be dropped.")
        import fapws._evwsgi as evwsgi
        from fapws import base, config

        port = self.port
        if float(config.SERVER_IDENT[-2:]) > 0.4:
            # fapws3 silently changed its API in 0.5
            port = str(port)
        evwsgi.start(self.host, port)
        # fapws3 never releases the GIL. Complain upstream. I tried. No luck.
        if "BOTTLE_CHILD" in os.environ and not self.quiet:
            _stderr("WARNING: Auto-reloading does not work with Fapws3.")
            _stderr("         (Fapws3 breaks python thread support)")
        evwsgi.set_base_module(base)

        def app(environ, start_response):
            environ["wsgi.multiprocess"] = False
            return handler(environ, start_response)

        evwsgi.wsgi_cb(("", app))
        evwsgi.run()

FileCheckerThread

Bases: Thread


              flowchart TD
              sgn.bottle.FileCheckerThread[FileCheckerThread]

              

              click sgn.bottle.FileCheckerThread href "" "sgn.bottle.FileCheckerThread"
            

Interrupt main-thread as soon as a changed module file is detected, the lockfile gets deleted or gets too old.

Source code in src/sgn/bottle.py
class FileCheckerThread(threading.Thread):
    """Interrupt main-thread as soon as a changed module file is detected,
    the lockfile gets deleted or gets too old."""

    def __init__(self, lockfile, interval):
        threading.Thread.__init__(self)
        self.daemon = True
        self.lockfile, self.interval = lockfile, interval
        #: Is one of 'reload', 'error' or 'exit'
        self.status = None

    def run(self):
        exists = os.path.exists
        mtime = lambda p: os.stat(p).st_mtime
        files = dict()

        for module in list(sys.modules.values()):
            path = getattr(module, "__file__", "") or ""
            if path[-4:] in (".pyo", ".pyc"):
                path = path[:-1]
            if path and exists(path):
                files[path] = mtime(path)

        while not self.status:
            if (
                not exists(self.lockfile)
                or mtime(self.lockfile) < time.time() - self.interval - 5
            ):
                self.status = "error"
                thread.interrupt_main()
            for path, lmtime in list(files.items()):
                if not exists(path) or mtime(path) > lmtime:
                    self.status = "reload"
                    thread.interrupt_main()
                    break
            time.sleep(self.interval)

    def __enter__(self):
        self.start()

    def __exit__(self, exc_type, *_):
        if not self.status:
            self.status = "exit"  # silent exit
        self.join()
        return exc_type is not None and issubclass(exc_type, KeyboardInterrupt)

FileUpload

Bases: object


              flowchart TD
              sgn.bottle.FileUpload[FileUpload]

              

              click sgn.bottle.FileUpload href "" "sgn.bottle.FileUpload"
            
Source code in src/sgn/bottle.py
class FileUpload(object):
    def __init__(self, fileobj, name, filename, headers=None):
        """Wrapper for a single file uploaded via ``multipart/form-data``."""
        #: Open file(-like) object (BytesIO buffer or temporary file)
        self.file = fileobj
        #: Name of the upload form field
        self.name = name
        #: Raw filename as sent by the client (may contain unsafe characters)
        self.raw_filename = filename
        #: A :class:`HeaderDict` with additional headers (e.g. content-type)
        self.headers = HeaderDict(headers) if headers else HeaderDict()

    content_type = HeaderProperty("Content-Type")
    content_length = HeaderProperty("Content-Length", reader=int, default=-1)

    def get_header(self, name, default=None):
        """Return the value of a header within the multipart part."""
        return self.headers.get(name, default)

    @cached_property
    def filename(self):
        """Name of the file on the client file system, but normalized to ensure
        file system compatibility. An empty filename is returned as 'empty'.

        Only ASCII letters, digits, dashes, underscores and dots are
        allowed in the final filename. Accents are removed, if possible.
        Whitespace is replaced by a single dash. Leading or tailing dots
        or dashes are removed. The filename is limited to 255 characters.
        """
        fname = self.raw_filename
        if not isinstance(fname, unicode):
            fname = fname.decode("utf8", "ignore")
        fname = normalize("NFKD", fname)
        fname = fname.encode("ASCII", "ignore").decode("ASCII")
        fname = os.path.basename(fname.replace("\\", os.path.sep))
        fname = re.sub(r"[^a-zA-Z0-9-_.\s]", "", fname).strip()
        fname = re.sub(r"[-\s]+", "-", fname).strip(".-")
        return fname[:255] or "empty"

    def _copy_file(self, fp, chunk_size=2**16):
        read, write, offset = self.file.read, fp.write, self.file.tell()
        while 1:
            buf = read(chunk_size)
            if not buf:
                break
            write(buf)
        self.file.seek(offset)

    def save(self, destination, overwrite=False, chunk_size=2**16):
        """Save file to disk or copy its content to an open file(-like) object.
        If *destination* is a directory, :attr:`filename` is added to the
        path. Existing files are not overwritten by default (IOError).

        :param destination: File path, directory or file(-like) object.
        :param overwrite: If True, replace existing files. (default: False)
        :param chunk_size: Bytes to read at a time. (default: 64kb)
        """
        if isinstance(destination, basestring):  # Except file-likes here
            if os.path.isdir(destination):
                destination = os.path.join(destination, self.filename)
            if not overwrite and os.path.exists(destination):
                raise IOError("File exists.")
            with open(destination, "wb") as fp:
                self._copy_file(fp, chunk_size)
        else:
            self._copy_file(destination, chunk_size)

__init__(fileobj, name, filename, headers=None)

Wrapper for a single file uploaded via multipart/form-data.

Source code in src/sgn/bottle.py
def __init__(self, fileobj, name, filename, headers=None):
    """Wrapper for a single file uploaded via ``multipart/form-data``."""
    #: Open file(-like) object (BytesIO buffer or temporary file)
    self.file = fileobj
    #: Name of the upload form field
    self.name = name
    #: Raw filename as sent by the client (may contain unsafe characters)
    self.raw_filename = filename
    #: A :class:`HeaderDict` with additional headers (e.g. content-type)
    self.headers = HeaderDict(headers) if headers else HeaderDict()

filename()

Name of the file on the client file system, but normalized to ensure file system compatibility. An empty filename is returned as 'empty'.

Only ASCII letters, digits, dashes, underscores and dots are allowed in the final filename. Accents are removed, if possible. Whitespace is replaced by a single dash. Leading or tailing dots or dashes are removed. The filename is limited to 255 characters.

Source code in src/sgn/bottle.py
@cached_property
def filename(self):
    """Name of the file on the client file system, but normalized to ensure
    file system compatibility. An empty filename is returned as 'empty'.

    Only ASCII letters, digits, dashes, underscores and dots are
    allowed in the final filename. Accents are removed, if possible.
    Whitespace is replaced by a single dash. Leading or tailing dots
    or dashes are removed. The filename is limited to 255 characters.
    """
    fname = self.raw_filename
    if not isinstance(fname, unicode):
        fname = fname.decode("utf8", "ignore")
    fname = normalize("NFKD", fname)
    fname = fname.encode("ASCII", "ignore").decode("ASCII")
    fname = os.path.basename(fname.replace("\\", os.path.sep))
    fname = re.sub(r"[^a-zA-Z0-9-_.\s]", "", fname).strip()
    fname = re.sub(r"[-\s]+", "-", fname).strip(".-")
    return fname[:255] or "empty"

get_header(name, default=None)

Return the value of a header within the multipart part.

Source code in src/sgn/bottle.py
def get_header(self, name, default=None):
    """Return the value of a header within the multipart part."""
    return self.headers.get(name, default)

save(destination, overwrite=False, chunk_size=2 ** 16)

Save file to disk or copy its content to an open file(-like) object. If destination is a directory, :attr:filename is added to the path. Existing files are not overwritten by default (IOError).

:param destination: File path, directory or file(-like) object. :param overwrite: If True, replace existing files. (default: False) :param chunk_size: Bytes to read at a time. (default: 64kb)

Source code in src/sgn/bottle.py
def save(self, destination, overwrite=False, chunk_size=2**16):
    """Save file to disk or copy its content to an open file(-like) object.
    If *destination* is a directory, :attr:`filename` is added to the
    path. Existing files are not overwritten by default (IOError).

    :param destination: File path, directory or file(-like) object.
    :param overwrite: If True, replace existing files. (default: False)
    :param chunk_size: Bytes to read at a time. (default: 64kb)
    """
    if isinstance(destination, basestring):  # Except file-likes here
        if os.path.isdir(destination):
            destination = os.path.join(destination, self.filename)
        if not overwrite and os.path.exists(destination):
            raise IOError("File exists.")
        with open(destination, "wb") as fp:
            self._copy_file(fp, chunk_size)
    else:
        self._copy_file(destination, chunk_size)

FormsDict

Bases: MultiDict


              flowchart TD
              sgn.bottle.FormsDict[FormsDict]
              sgn.bottle.MultiDict[MultiDict]

                              sgn.bottle.MultiDict --> sgn.bottle.FormsDict
                


              click sgn.bottle.FormsDict href "" "sgn.bottle.FormsDict"
              click sgn.bottle.MultiDict href "" "sgn.bottle.MultiDict"
            

This :class:MultiDict subclass is used to store request form data. Additionally to the normal dict-like item access methods (which return unmodified data as native strings), this container also supports attribute-like access to its values. Attributes are automatically de- or recoded to match :attr:input_encoding (default: 'utf8'). Missing attributes default to an empty string.

Source code in src/sgn/bottle.py
class FormsDict(MultiDict):
    """This :class:`MultiDict` subclass is used to store request form data.
    Additionally to the normal dict-like item access methods (which return
    unmodified data as native strings), this container also supports
    attribute-like access to its values. Attributes are automatically de-
    or recoded to match :attr:`input_encoding` (default: 'utf8'). Missing
    attributes default to an empty string."""

    #: Encoding used for attribute values.
    input_encoding = "utf8"
    #: If true (default), unicode strings are first encoded with `latin1`
    #: and then decoded to match :attr:`input_encoding`.
    recode_unicode = True

    def _fix(self, s, encoding=None):
        if isinstance(s, unicode) and self.recode_unicode:  # Python 3 WSGI
            return s.encode("latin1").decode(encoding or self.input_encoding)
        elif isinstance(s, bytes):  # Python 2 WSGI
            return s.decode(encoding or self.input_encoding)
        else:
            return s

    def decode(self, encoding=None):
        """Returns a copy with all keys and values de- or recoded to match
        :attr:`input_encoding`. Some libraries (e.g. WTForms) want a
        unicode dictionary."""
        copy = FormsDict()
        enc = copy.input_encoding = encoding or self.input_encoding
        copy.recode_unicode = False
        for key, value in self.allitems():
            copy.append(self._fix(key, enc), self._fix(value, enc))
        return copy

    def getunicode(self, name, default=None, encoding=None):
        """Return the value as a unicode string, or the default."""
        try:
            return self._fix(self[name], encoding)
        except (UnicodeError, KeyError):
            return default

    def __getattr__(self, name, default=unicode()):
        # Without this guard, pickle generates a cryptic TypeError:
        if name.startswith("__") and name.endswith("__"):
            return super(FormsDict, self).__getattr__(name)
        return self.getunicode(name, default=default)

decode(encoding=None)

Returns a copy with all keys and values de- or recoded to match :attr:input_encoding. Some libraries (e.g. WTForms) want a unicode dictionary.

Source code in src/sgn/bottle.py
def decode(self, encoding=None):
    """Returns a copy with all keys and values de- or recoded to match
    :attr:`input_encoding`. Some libraries (e.g. WTForms) want a
    unicode dictionary."""
    copy = FormsDict()
    enc = copy.input_encoding = encoding or self.input_encoding
    copy.recode_unicode = False
    for key, value in self.allitems():
        copy.append(self._fix(key, enc), self._fix(value, enc))
    return copy

getunicode(name, default=None, encoding=None)

Return the value as a unicode string, or the default.

Source code in src/sgn/bottle.py
def getunicode(self, name, default=None, encoding=None):
    """Return the value as a unicode string, or the default."""
    try:
        return self._fix(self[name], encoding)
    except (UnicodeError, KeyError):
        return default

GeventServer

Bases: ServerAdapter


              flowchart TD
              sgn.bottle.GeventServer[GeventServer]
              sgn.bottle.ServerAdapter[ServerAdapter]

                              sgn.bottle.ServerAdapter --> sgn.bottle.GeventServer
                


              click sgn.bottle.GeventServer href "" "sgn.bottle.GeventServer"
              click sgn.bottle.ServerAdapter href "" "sgn.bottle.ServerAdapter"
            

Untested. Options:

  • See gevent.wsgi.WSGIServer() documentation for more options.
Source code in src/sgn/bottle.py
class GeventServer(ServerAdapter):
    """Untested. Options:

    * See gevent.wsgi.WSGIServer() documentation for more options.
    """

    def run(self, handler):
        from gevent import local, pywsgi

        if not isinstance(threading.local(), local.local):
            msg = "Bottle requires gevent.monkey.patch_all() (before import)"
            raise RuntimeError(msg)
        if self.quiet:
            self.options["log"] = None
        address = (self.host, self.port)
        server = pywsgi.WSGIServer(address, handler, **self.options)
        if "BOTTLE_CHILD" in os.environ:
            import signal

            signal.signal(signal.SIGINT, lambda s, f: server.stop())
        server.serve_forever()

GunicornServer

Bases: ServerAdapter


              flowchart TD
              sgn.bottle.GunicornServer[GunicornServer]
              sgn.bottle.ServerAdapter[ServerAdapter]

                              sgn.bottle.ServerAdapter --> sgn.bottle.GunicornServer
                


              click sgn.bottle.GunicornServer href "" "sgn.bottle.GunicornServer"
              click sgn.bottle.ServerAdapter href "" "sgn.bottle.ServerAdapter"
            

Untested. See http://gunicorn.org/configure.html for options.

Source code in src/sgn/bottle.py
class GunicornServer(ServerAdapter):
    """Untested. See http://gunicorn.org/configure.html for options."""

    def run(self, handler):
        from gunicorn.app.base import BaseApplication

        if self.host.startswith("unix:"):
            config = {"bind": self.host}
        else:
            config = {"bind": "%s:%d" % (self.host, self.port)}

        config.update(self.options)

        class GunicornApplication(BaseApplication):
            def load_config(self):
                for key, value in config.items():
                    self.cfg.set(key, value)

            def load(self):
                return handler

        GunicornApplication().run()

HTTPError

Bases: HTTPResponse


              flowchart TD
              sgn.bottle.HTTPError[HTTPError]
              sgn.bottle.HTTPResponse[HTTPResponse]
              sgn.bottle.BottleException[BottleException]

                              sgn.bottle.HTTPResponse --> sgn.bottle.HTTPError
                                sgn.bottle.Response --> sgn.bottle.HTTPResponse
                
                sgn.bottle.BottleException --> sgn.bottle.HTTPResponse
                



              click sgn.bottle.HTTPError href "" "sgn.bottle.HTTPError"
              click sgn.bottle.HTTPResponse href "" "sgn.bottle.HTTPResponse"
              click sgn.bottle.BottleException href "" "sgn.bottle.BottleException"
            

A subclass of :class:HTTPResponse that triggers error handlers.

Source code in src/sgn/bottle.py
class HTTPError(HTTPResponse):
    """A subclass of :class:`HTTPResponse` that triggers error handlers."""

    default_status = 500

    def __init__(
        self, status=None, body=None, exception=None, traceback=None, **more_headers
    ):
        self.exception = exception
        self.traceback = traceback
        super(HTTPError, self).__init__(body, status, **more_headers)

HTTPResponse

Bases: Response, BottleException


              flowchart TD
              sgn.bottle.HTTPResponse[HTTPResponse]
              sgn.bottle.BottleException[BottleException]

                              sgn.bottle.Response --> sgn.bottle.HTTPResponse
                
                sgn.bottle.BottleException --> sgn.bottle.HTTPResponse
                


              click sgn.bottle.HTTPResponse href "" "sgn.bottle.HTTPResponse"
              click sgn.bottle.BottleException href "" "sgn.bottle.BottleException"
            

A subclass of :class:Response that can be raised or returned from request handlers to short-curcuit request processing and override changes made to the global :data:request object. This bypasses error handlers, even if the status code indicates an error. Return or raise :class:HTTPError to trigger error handlers.

Source code in src/sgn/bottle.py
class HTTPResponse(Response, BottleException):
    """A subclass of :class:`Response` that can be raised or returned from request
    handlers to short-curcuit request processing and override changes made to the
    global :data:`request` object. This bypasses error handlers, even if the status
    code indicates an error. Return or raise :class:`HTTPError` to trigger error
    handlers.
    """

    def __init__(self, body="", status=None, headers=None, **more_headers):
        super(HTTPResponse, self).__init__(body, status, headers, **more_headers)

    def apply(self, other):
        """Copy the state of this response to a different :class:`Response` object."""
        other._status_code = self._status_code
        other._status_line = self._status_line
        other._headers = self._headers
        other._cookies = self._cookies
        other.body = self.body

apply(other)

Copy the state of this response to a different :class:Response object.

Source code in src/sgn/bottle.py
def apply(self, other):
    """Copy the state of this response to a different :class:`Response` object."""
    other._status_code = self._status_code
    other._status_line = self._status_line
    other._headers = self._headers
    other._cookies = self._cookies
    other.body = self.body

HeaderDict

Bases: MultiDict


              flowchart TD
              sgn.bottle.HeaderDict[HeaderDict]
              sgn.bottle.MultiDict[MultiDict]

                              sgn.bottle.MultiDict --> sgn.bottle.HeaderDict
                


              click sgn.bottle.HeaderDict href "" "sgn.bottle.HeaderDict"
              click sgn.bottle.MultiDict href "" "sgn.bottle.MultiDict"
            

A case-insensitive version of :class:MultiDict that defaults to replace the old value instead of appending it.

Source code in src/sgn/bottle.py
class HeaderDict(MultiDict):
    """A case-insensitive version of :class:`MultiDict` that defaults to
    replace the old value instead of appending it."""

    def __init__(self, *a, **ka):
        self.dict = {}
        if a or ka:
            self.update(*a, **ka)

    def __contains__(self, key):
        return _hkey(key) in self.dict

    def __delitem__(self, key):
        del self.dict[_hkey(key)]

    def __getitem__(self, key):
        return self.dict[_hkey(key)][-1]

    def __setitem__(self, key, value):
        self.dict[_hkey(key)] = [_hval(value)]

    def append(self, key, value):
        self.dict.setdefault(_hkey(key), []).append(_hval(value))

    def replace(self, key, value):
        self.dict[_hkey(key)] = [_hval(value)]

    def getall(self, key):
        return self.dict.get(_hkey(key)) or []

    def get(self, key, default=None, index=-1):
        return MultiDict.get(self, _hkey(key), default, index)

    def filter(self, names):
        for name in (_hkey(n) for n in names):
            if name in self.dict:
                del self.dict[name]

LocalRequest

Bases: BaseRequest


              flowchart TD
              sgn.bottle.LocalRequest[LocalRequest]
              sgn.bottle.BaseRequest[BaseRequest]

                              sgn.bottle.BaseRequest --> sgn.bottle.LocalRequest
                


              click sgn.bottle.LocalRequest href "" "sgn.bottle.LocalRequest"
              click sgn.bottle.BaseRequest href "" "sgn.bottle.BaseRequest"
            

A thread-local subclass of :class:BaseRequest with a different set of attributes for each thread. There is usually only one global instance of this class (:data:request). If accessed during a request/response cycle, this instance always refers to the current request (even on a multithreaded server).

Source code in src/sgn/bottle.py
class LocalRequest(BaseRequest):
    """A thread-local subclass of :class:`BaseRequest` with a different
    set of attributes for each thread. There is usually only one global
    instance of this class (:data:`request`). If accessed during a
    request/response cycle, this instance always refers to the *current*
    request (even on a multithreaded server)."""

    bind = BaseRequest.__init__
    environ = _local_property()

LocalResponse

Bases: BaseResponse


              flowchart TD
              sgn.bottle.LocalResponse[LocalResponse]
              sgn.bottle.BaseResponse[BaseResponse]

                              sgn.bottle.BaseResponse --> sgn.bottle.LocalResponse
                


              click sgn.bottle.LocalResponse href "" "sgn.bottle.LocalResponse"
              click sgn.bottle.BaseResponse href "" "sgn.bottle.BaseResponse"
            

A thread-local subclass of :class:BaseResponse with a different set of attributes for each thread. There is usually only one global instance of this class (:data:response). Its attributes are used to build the HTTP response at the end of the request/response cycle.

Source code in src/sgn/bottle.py
class LocalResponse(BaseResponse):
    """A thread-local subclass of :class:`BaseResponse` with a different
    set of attributes for each thread. There is usually only one global
    instance of this class (:data:`response`). Its attributes are used
    to build the HTTP response at the end of the request/response cycle.
    """

    bind = BaseResponse.__init__
    _status_line = _local_property()
    _status_code = _local_property()
    _cookies = _local_property()
    _headers = _local_property()
    body = _local_property()

MultiDict

Bases: MutableMapping


              flowchart TD
              sgn.bottle.MultiDict[MultiDict]

              

              click sgn.bottle.MultiDict href "" "sgn.bottle.MultiDict"
            

This dict stores multiple values per key, but behaves exactly like a normal dict in that it returns only the newest value for any given key. There are special methods available to access the full list of values.

Source code in src/sgn/bottle.py
class MultiDict(DictMixin):
    """This dict stores multiple values per key, but behaves exactly like a
    normal dict in that it returns only the newest value for any given key.
    There are special methods available to access the full list of values.
    """

    def __init__(self, *a, **k):
        self.dict = dict((k, [v]) for (k, v) in dict(*a, **k).items())

    def __len__(self):
        return len(self.dict)

    def __iter__(self):
        return iter(self.dict)

    def __contains__(self, key):
        return key in self.dict

    def __delitem__(self, key):
        del self.dict[key]

    def __getitem__(self, key):
        return self.dict[key][-1]

    def __setitem__(self, key, value):
        self.append(key, value)

    def keys(self):
        return self.dict.keys()

    if py3k:

        def values(self):
            return (v[-1] for v in self.dict.values())

        def items(self):
            return ((k, v[-1]) for k, v in self.dict.items())

        def allitems(self):
            return ((k, v) for k, vl in self.dict.items() for v in vl)

        iterkeys = keys
        itervalues = values
        iteritems = items
        iterallitems = allitems

    else:

        def values(self):
            return [v[-1] for v in self.dict.values()]

        def items(self):
            return [(k, v[-1]) for k, v in self.dict.items()]

        def iterkeys(self):
            return self.dict.iterkeys()

        def itervalues(self):
            return (v[-1] for v in self.dict.itervalues())

        def iteritems(self):
            return ((k, v[-1]) for k, v in self.dict.iteritems())

        def iterallitems(self):
            return ((k, v) for k, vl in self.dict.iteritems() for v in vl)

        def allitems(self):
            return [(k, v) for k, vl in self.dict.iteritems() for v in vl]

    def get(self, key, default=None, index=-1, type=None):
        """Return the most recent value for a key.

        :param default: The default value to be returned if the key is not
               present or the type conversion fails.
        :param index: An index for the list of available values.
        :param type: If defined, this callable is used to cast the value
                into a specific type. Exception are suppressed and result in
                the default value to be returned.
        """
        try:
            val = self.dict[key][index]
            return type(val) if type else val
        except Exception:
            pass
        return default

    def append(self, key, value):
        """Add a new value to the list of values for this key."""
        self.dict.setdefault(key, []).append(value)

    def replace(self, key, value):
        """Replace the list of values with a single value."""
        self.dict[key] = [value]

    def getall(self, key):
        """Return a (possibly empty) list of values for a key."""
        return self.dict.get(key) or []

    #: Aliases for WTForms to mimic other multi-dict APIs (Django)
    getone = get
    getlist = getall

append(key, value)

Add a new value to the list of values for this key.

Source code in src/sgn/bottle.py
def append(self, key, value):
    """Add a new value to the list of values for this key."""
    self.dict.setdefault(key, []).append(value)

get(key, default=None, index=-1, type=None)

Return the most recent value for a key.

:param default: The default value to be returned if the key is not present or the type conversion fails. :param index: An index for the list of available values. :param type: If defined, this callable is used to cast the value into a specific type. Exception are suppressed and result in the default value to be returned.

Source code in src/sgn/bottle.py
def get(self, key, default=None, index=-1, type=None):
    """Return the most recent value for a key.

    :param default: The default value to be returned if the key is not
           present or the type conversion fails.
    :param index: An index for the list of available values.
    :param type: If defined, this callable is used to cast the value
            into a specific type. Exception are suppressed and result in
            the default value to be returned.
    """
    try:
        val = self.dict[key][index]
        return type(val) if type else val
    except Exception:
        pass
    return default

getall(key)

Return a (possibly empty) list of values for a key.

Source code in src/sgn/bottle.py
def getall(self, key):
    """Return a (possibly empty) list of values for a key."""
    return self.dict.get(key) or []

replace(key, value)

Replace the list of values with a single value.

Source code in src/sgn/bottle.py
def replace(self, key, value):
    """Replace the list of values with a single value."""
    self.dict[key] = [value]

ResourceManager

Bases: object


              flowchart TD
              sgn.bottle.ResourceManager[ResourceManager]

              

              click sgn.bottle.ResourceManager href "" "sgn.bottle.ResourceManager"
            

This class manages a list of search paths and helps to find and open application-bound resources (files).

:param base: default value for :meth:add_path calls. :param opener: callable used to open resources. :param cachemode: controls which lookups are cached. One of 'all', 'found' or 'none'.

Source code in src/sgn/bottle.py
class ResourceManager(object):
    """This class manages a list of search paths and helps to find and open
    application-bound resources (files).

    :param base: default value for :meth:`add_path` calls.
    :param opener: callable used to open resources.
    :param cachemode: controls which lookups are cached. One of 'all',
                     'found' or 'none'.
    """

    def __init__(self, base="./", opener=open, cachemode="all"):
        self.opener = opener
        self.base = base
        self.cachemode = cachemode

        #: A list of search paths. See :meth:`add_path` for details.
        self.path = []
        #: A cache for resolved paths. ``res.cache.clear()`` clears the cache.
        self.cache = {}

    def add_path(self, path, base=None, index=None, create=False):
        """Add a new path to the list of search paths. Return False if the
        path does not exist.

        :param path: The new search path. Relative paths are turned into
            an absolute and normalized form. If the path looks like a file
            (not ending in `/`), the filename is stripped off.
        :param base: Path used to absolutize relative search paths.
            Defaults to :attr:`base` which defaults to ``os.getcwd()``.
        :param index: Position within the list of search paths. Defaults
            to last index (appends to the list).

        The `base` parameter makes it easy to reference files installed
        along with a python module or package::

            res.add_path('./resources/', __file__)
        """
        base = os.path.abspath(os.path.dirname(base or self.base))
        path = os.path.abspath(os.path.join(base, os.path.dirname(path)))
        path += os.sep
        if path in self.path:
            self.path.remove(path)
        if create and not os.path.isdir(path):
            os.makedirs(path)
        if index is None:
            self.path.append(path)
        else:
            self.path.insert(index, path)
        self.cache.clear()
        return os.path.exists(path)

    def __iter__(self):
        """Iterate over all existing files in all registered paths."""
        search = self.path[:]
        while search:
            path = search.pop()
            if not os.path.isdir(path):
                continue
            for name in os.listdir(path):
                full = os.path.join(path, name)
                if os.path.isdir(full):
                    search.append(full)
                else:
                    yield full

    def lookup(self, name):
        """Search for a resource and return an absolute file path, or `None`.

        The :attr:`path` list is searched in order. The first match is
        returned. Symlinks are followed. The result is cached to speed up
        future lookups."""
        if name not in self.cache or DEBUG:
            for path in self.path:
                fpath = os.path.join(path, name)
                if os.path.isfile(fpath):
                    if self.cachemode in ("all", "found"):
                        self.cache[name] = fpath
                    return fpath
            if self.cachemode == "all":
                self.cache[name] = None
        return self.cache[name]

    def open(self, name, mode="r", *args, **kwargs):
        """Find a resource and return a file object, or raise IOError."""
        fname = self.lookup(name)
        if not fname:
            raise IOError("Resource %r not found." % name)
        return self.opener(fname, mode=mode, *args, **kwargs)

__iter__()

Iterate over all existing files in all registered paths.

Source code in src/sgn/bottle.py
def __iter__(self):
    """Iterate over all existing files in all registered paths."""
    search = self.path[:]
    while search:
        path = search.pop()
        if not os.path.isdir(path):
            continue
        for name in os.listdir(path):
            full = os.path.join(path, name)
            if os.path.isdir(full):
                search.append(full)
            else:
                yield full

add_path(path, base=None, index=None, create=False)

Add a new path to the list of search paths. Return False if the path does not exist.

:param path: The new search path. Relative paths are turned into an absolute and normalized form. If the path looks like a file (not ending in /), the filename is stripped off. :param base: Path used to absolutize relative search paths. Defaults to :attr:base which defaults to os.getcwd(). :param index: Position within the list of search paths. Defaults to last index (appends to the list).

The base parameter makes it easy to reference files installed along with a python module or package::

res.add_path('./resources/', __file__)
Source code in src/sgn/bottle.py
def add_path(self, path, base=None, index=None, create=False):
    """Add a new path to the list of search paths. Return False if the
    path does not exist.

    :param path: The new search path. Relative paths are turned into
        an absolute and normalized form. If the path looks like a file
        (not ending in `/`), the filename is stripped off.
    :param base: Path used to absolutize relative search paths.
        Defaults to :attr:`base` which defaults to ``os.getcwd()``.
    :param index: Position within the list of search paths. Defaults
        to last index (appends to the list).

    The `base` parameter makes it easy to reference files installed
    along with a python module or package::

        res.add_path('./resources/', __file__)
    """
    base = os.path.abspath(os.path.dirname(base or self.base))
    path = os.path.abspath(os.path.join(base, os.path.dirname(path)))
    path += os.sep
    if path in self.path:
        self.path.remove(path)
    if create and not os.path.isdir(path):
        os.makedirs(path)
    if index is None:
        self.path.append(path)
    else:
        self.path.insert(index, path)
    self.cache.clear()
    return os.path.exists(path)

lookup(name)

Search for a resource and return an absolute file path, or None.

The :attr:path list is searched in order. The first match is returned. Symlinks are followed. The result is cached to speed up future lookups.

Source code in src/sgn/bottle.py
def lookup(self, name):
    """Search for a resource and return an absolute file path, or `None`.

    The :attr:`path` list is searched in order. The first match is
    returned. Symlinks are followed. The result is cached to speed up
    future lookups."""
    if name not in self.cache or DEBUG:
        for path in self.path:
            fpath = os.path.join(path, name)
            if os.path.isfile(fpath):
                if self.cachemode in ("all", "found"):
                    self.cache[name] = fpath
                return fpath
        if self.cachemode == "all":
            self.cache[name] = None
    return self.cache[name]

open(name, mode='r', *args, **kwargs)

Find a resource and return a file object, or raise IOError.

Source code in src/sgn/bottle.py
def open(self, name, mode="r", *args, **kwargs):
    """Find a resource and return a file object, or raise IOError."""
    fname = self.lookup(name)
    if not fname:
        raise IOError("Resource %r not found." % name)
    return self.opener(fname, mode=mode, *args, **kwargs)

Route

Bases: object


              flowchart TD
              sgn.bottle.Route[Route]

              

              click sgn.bottle.Route href "" "sgn.bottle.Route"
            

This class wraps a route callback along with route specific metadata and configuration and applies Plugins on demand. It is also responsible for turning an URL path rule into a regular expression usable by the Router.

Source code in src/sgn/bottle.py
class Route(object):
    """This class wraps a route callback along with route specific metadata and
    configuration and applies Plugins on demand. It is also responsible for
    turning an URL path rule into a regular expression usable by the Router.
    """

    def __init__(
        self,
        app,
        rule,
        method,
        callback,
        name=None,
        plugins=None,
        skiplist=None,
        **config
    ):
        #: The application this route is installed to.
        self.app = app
        #: The path-rule string (e.g. ``/wiki/<page>``).
        self.rule = rule
        #: The HTTP method as a string (e.g. ``GET``).
        self.method = method
        #: The original callback with no plugins applied. Useful for introspection.
        self.callback = callback
        #: The name of the route (if specified) or ``None``.
        self.name = name or None
        #: A list of route-specific plugins (see :meth:`Bottle.route`).
        self.plugins = plugins or []
        #: A list of plugins to not apply to this route (see :meth:`Bottle.route`).
        self.skiplist = skiplist or []
        #: Additional keyword arguments passed to the :meth:`Bottle.route`
        #: decorator are stored in this dictionary. Used for route-specific
        #: plugin configuration and meta-data.
        self.config = app.config._make_overlay()
        self.config.load_dict(config)

    @cached_property
    def call(self):
        """The route callback with all plugins applied. This property is
        created on demand and then cached to speed up subsequent requests."""
        return self._make_callback()

    def reset(self):
        """Forget any cached values. The next time :attr:`call` is accessed,
        all plugins are re-applied."""
        self.__dict__.pop("call", None)

    def prepare(self):
        """Do all on-demand work immediately (useful for debugging)."""
        self.call

    def all_plugins(self):
        """Yield all Plugins affecting this route."""
        unique = set()
        for p in reversed(self.app.plugins + self.plugins):
            if True in self.skiplist:
                break
            name = getattr(p, "name", False)
            if name and (name in self.skiplist or name in unique):
                continue
            if p in self.skiplist or type(p) in self.skiplist:
                continue
            if name:
                unique.add(name)
            yield p

    def _make_callback(self):
        callback = self.callback
        for plugin in self.all_plugins():
            if hasattr(plugin, "apply"):
                callback = plugin.apply(callback, self)
            else:
                callback = plugin(callback)
            if callback is not self.callback:
                update_wrapper(callback, self.callback)
        return callback

    def get_undecorated_callback(self):
        """Return the callback. If the callback is a decorated function, try to
        recover the original function."""
        func = self.callback
        func = getattr(func, "__func__" if py3k else "im_func", func)
        closure_attr = "__closure__" if py3k else "func_closure"
        while hasattr(func, closure_attr) and getattr(func, closure_attr):
            attributes = getattr(func, closure_attr)
            func = attributes[0].cell_contents

            # in case of decorators with multiple arguments
            if not isinstance(func, FunctionType):
                # pick first FunctionType instance from multiple arguments
                func = filter(
                    lambda x: isinstance(x, FunctionType),
                    map(lambda x: x.cell_contents, attributes),
                )
                func = list(func)[0]  # py3 support
        return func

    def get_callback_args(self):
        """Return a list of argument names the callback (most likely) accepts
        as keyword arguments. If the callback is a decorated function, try
        to recover the original function before inspection."""
        return getargspec(self.get_undecorated_callback())[0]

    def get_config(self, key, default=None):
        """Lookup a config field and return its value, first checking the
        route.config, then route.app.config."""
        depr(
            0,
            13,
            "Route.get_config() is deprecated.",
            "The Route.config property already includes values from the"
            " application config for missing keys. Access it directly.",
        )
        return self.config.get(key, default)

    def __repr__(self):
        cb = self.get_undecorated_callback()
        return "<%s %s -> %s:%s>" % (self.method, self.rule, cb.__module__, cb.__name__)

all_plugins()

Yield all Plugins affecting this route.

Source code in src/sgn/bottle.py
def all_plugins(self):
    """Yield all Plugins affecting this route."""
    unique = set()
    for p in reversed(self.app.plugins + self.plugins):
        if True in self.skiplist:
            break
        name = getattr(p, "name", False)
        if name and (name in self.skiplist or name in unique):
            continue
        if p in self.skiplist or type(p) in self.skiplist:
            continue
        if name:
            unique.add(name)
        yield p

call()

The route callback with all plugins applied. This property is created on demand and then cached to speed up subsequent requests.

Source code in src/sgn/bottle.py
@cached_property
def call(self):
    """The route callback with all plugins applied. This property is
    created on demand and then cached to speed up subsequent requests."""
    return self._make_callback()

get_callback_args()

Return a list of argument names the callback (most likely) accepts as keyword arguments. If the callback is a decorated function, try to recover the original function before inspection.

Source code in src/sgn/bottle.py
def get_callback_args(self):
    """Return a list of argument names the callback (most likely) accepts
    as keyword arguments. If the callback is a decorated function, try
    to recover the original function before inspection."""
    return getargspec(self.get_undecorated_callback())[0]

get_config(key, default=None)

Lookup a config field and return its value, first checking the route.config, then route.app.config.

Source code in src/sgn/bottle.py
def get_config(self, key, default=None):
    """Lookup a config field and return its value, first checking the
    route.config, then route.app.config."""
    depr(
        0,
        13,
        "Route.get_config() is deprecated.",
        "The Route.config property already includes values from the"
        " application config for missing keys. Access it directly.",
    )
    return self.config.get(key, default)

get_undecorated_callback()

Return the callback. If the callback is a decorated function, try to recover the original function.

Source code in src/sgn/bottle.py
def get_undecorated_callback(self):
    """Return the callback. If the callback is a decorated function, try to
    recover the original function."""
    func = self.callback
    func = getattr(func, "__func__" if py3k else "im_func", func)
    closure_attr = "__closure__" if py3k else "func_closure"
    while hasattr(func, closure_attr) and getattr(func, closure_attr):
        attributes = getattr(func, closure_attr)
        func = attributes[0].cell_contents

        # in case of decorators with multiple arguments
        if not isinstance(func, FunctionType):
            # pick first FunctionType instance from multiple arguments
            func = filter(
                lambda x: isinstance(x, FunctionType),
                map(lambda x: x.cell_contents, attributes),
            )
            func = list(func)[0]  # py3 support
    return func

prepare()

Do all on-demand work immediately (useful for debugging).

Source code in src/sgn/bottle.py
def prepare(self):
    """Do all on-demand work immediately (useful for debugging)."""
    self.call

reset()

Forget any cached values. The next time :attr:call is accessed, all plugins are re-applied.

Source code in src/sgn/bottle.py
def reset(self):
    """Forget any cached values. The next time :attr:`call` is accessed,
    all plugins are re-applied."""
    self.__dict__.pop("call", None)

RouteBuildError

Bases: RouteError


              flowchart TD
              sgn.bottle.RouteBuildError[RouteBuildError]
              sgn.bottle.RouteError[RouteError]
              sgn.bottle.BottleException[BottleException]

                              sgn.bottle.RouteError --> sgn.bottle.RouteBuildError
                                sgn.bottle.BottleException --> sgn.bottle.RouteError
                



              click sgn.bottle.RouteBuildError href "" "sgn.bottle.RouteBuildError"
              click sgn.bottle.RouteError href "" "sgn.bottle.RouteError"
              click sgn.bottle.BottleException href "" "sgn.bottle.BottleException"
            

The route could not be built.

Source code in src/sgn/bottle.py
class RouteBuildError(RouteError):
    """The route could not be built."""

RouteError

Bases: BottleException


              flowchart TD
              sgn.bottle.RouteError[RouteError]
              sgn.bottle.BottleException[BottleException]

                              sgn.bottle.BottleException --> sgn.bottle.RouteError
                


              click sgn.bottle.RouteError href "" "sgn.bottle.RouteError"
              click sgn.bottle.BottleException href "" "sgn.bottle.BottleException"
            

This is a base class for all routing related exceptions

Source code in src/sgn/bottle.py
class RouteError(BottleException):
    """This is a base class for all routing related exceptions"""

RouteSyntaxError

Bases: RouteError


              flowchart TD
              sgn.bottle.RouteSyntaxError[RouteSyntaxError]
              sgn.bottle.RouteError[RouteError]
              sgn.bottle.BottleException[BottleException]

                              sgn.bottle.RouteError --> sgn.bottle.RouteSyntaxError
                                sgn.bottle.BottleException --> sgn.bottle.RouteError
                



              click sgn.bottle.RouteSyntaxError href "" "sgn.bottle.RouteSyntaxError"
              click sgn.bottle.RouteError href "" "sgn.bottle.RouteError"
              click sgn.bottle.BottleException href "" "sgn.bottle.BottleException"
            

The route parser found something not supported by this router.

Source code in src/sgn/bottle.py
class RouteSyntaxError(RouteError):
    """The route parser found something not supported by this router."""

Router

Bases: object


              flowchart TD
              sgn.bottle.Router[Router]

              

              click sgn.bottle.Router href "" "sgn.bottle.Router"
            

A Router is an ordered collection of route->target pairs. It is used to efficiently match WSGI requests against a number of routes and return the first target that satisfies the request. The target may be anything, usually a string, ID or callable object. A route consists of a path-rule and a HTTP method.

The path-rule is either a static path (e.g. /contact) or a dynamic path that contains wildcards (e.g. /wiki/<page>). The wildcard syntax and details on the matching order are described in docs:routing.

Source code in src/sgn/bottle.py
class Router(object):
    """A Router is an ordered collection of route->target pairs. It is used to
    efficiently match WSGI requests against a number of routes and return
    the first target that satisfies the request. The target may be anything,
    usually a string, ID or callable object. A route consists of a path-rule
    and a HTTP method.

    The path-rule is either a static path (e.g. `/contact`) or a dynamic
    path that contains wildcards (e.g. `/wiki/<page>`). The wildcard syntax
    and details on the matching order are described in docs:`routing`.
    """

    default_pattern = "[^/]+"
    default_filter = "re"

    #: The current CPython regexp implementation does not allow more
    #: than 99 matching groups per regular expression.
    _MAX_GROUPS_PER_PATTERN = 99

    def __init__(self, strict=False):
        self.rules = []  # All rules in order
        self._groups = {}  # index of regexes to find them in dyna_routes
        self.builder = {}  # Data structure for the url builder
        self.static = {}  # Search structure for static routes
        self.dyna_routes = {}
        self.dyna_regexes = {}  # Search structure for dynamic routes
        #: If true, static routes are no longer checked first.
        self.strict_order = strict
        self.filters = {
            "re": lambda conf: (_re_flatten(conf or self.default_pattern), None, None),
            "int": lambda conf: (r"-?\d+", int, lambda x: str(int(x))),
            "float": lambda conf: (r"-?[\d.]+", float, lambda x: str(float(x))),
            "path": lambda conf: (r".+?", None, None),
        }

    def add_filter(self, name, func):
        """Add a filter. The provided function is called with the configuration
        string as parameter and must return a (regexp, to_python, to_url) tuple.
        The first element is a string, the last two are callables or None."""
        self.filters[name] = func

    rule_syntax = re.compile(
        "(\\\\*)"
        "(?:(?::([a-zA-Z_][a-zA-Z_0-9]*)?()(?:#(.*?)#)?)"
        "|(?:<([a-zA-Z_][a-zA-Z_0-9]*)?(?::([a-zA-Z_]*)"
        "(?::((?:\\\\.|[^\\\\>])+)?)?)?>))"
    )

    def _itertokens(self, rule):
        offset, prefix = 0, ""
        for match in self.rule_syntax.finditer(rule):
            prefix += rule[offset : match.start()]
            g = match.groups()
            if g[2] is not None:
                depr(
                    0,
                    13,
                    "Use of old route syntax.",
                    "Use <name> instead of :name in routes.",
                    stacklevel=4,
                )
            if len(g[0]) % 2:  # Escaped wildcard
                prefix += match.group(0)[len(g[0]) :]
                offset = match.end()
                continue
            if prefix:
                yield prefix, None, None
            name, filtr, conf = g[4:7] if g[2] is None else g[1:4]
            yield name, filtr or "default", conf or None
            offset, prefix = match.end(), ""
        if offset <= len(rule) or prefix:
            yield prefix + rule[offset:], None, None

    def add(self, rule, method, target, name=None):
        """Add a new rule or replace the target for an existing rule."""
        anons = 0  # Number of anonymous wildcards found
        keys = []  # Names of keys
        pattern = ""  # Regular expression pattern with named groups
        filters = []  # Lists of wildcard input filters
        builder = []  # Data structure for the URL builder
        is_static = True

        for key, mode, conf in self._itertokens(rule):
            if mode:
                is_static = False
                if mode == "default":
                    mode = self.default_filter
                mask, in_filter, out_filter = self.filters[mode](conf)
                if not key:
                    pattern += "(?:%s)" % mask
                    key = "anon%d" % anons
                    anons += 1
                else:
                    pattern += "(?P<%s>%s)" % (key, mask)
                    keys.append(key)
                if in_filter:
                    filters.append((key, in_filter))
                builder.append((key, out_filter or str))
            elif key:
                pattern += re.escape(key)
                builder.append((None, key))

        self.builder[rule] = builder
        if name:
            self.builder[name] = builder

        if is_static and not self.strict_order:
            self.static.setdefault(method, {})
            self.static[method][self.build(rule)] = (target, None)
            return

        try:
            re_pattern = re.compile("^(%s)$" % pattern)
            re_match = re_pattern.match
        except re.error as e:
            raise RouteSyntaxError("Could not add Route: %s (%s)" % (rule, e))

        if filters:

            def getargs(path):
                url_args = re_match(path).groupdict()
                for name, wildcard_filter in filters:
                    try:
                        url_args[name] = wildcard_filter(url_args[name])
                    except ValueError:
                        raise HTTPError(400, "Path has wrong format.")
                return url_args

        elif re_pattern.groupindex:

            def getargs(path):
                return re_match(path).groupdict()

        else:
            getargs = None

        flatpat = _re_flatten(pattern)
        whole_rule = (rule, flatpat, target, getargs)

        if (flatpat, method) in self._groups:
            if DEBUG:
                msg = "Route <%s %s> overwrites a previously defined route"
                warnings.warn(msg % (method, rule), RuntimeWarning, stacklevel=3)
            self.dyna_routes[method][self._groups[flatpat, method]] = whole_rule
        else:
            self.dyna_routes.setdefault(method, []).append(whole_rule)
            self._groups[flatpat, method] = len(self.dyna_routes[method]) - 1

        self._compile(method)

    def _compile(self, method):
        all_rules = self.dyna_routes[method]
        comborules = self.dyna_regexes[method] = []
        maxgroups = self._MAX_GROUPS_PER_PATTERN
        for x in range(0, len(all_rules), maxgroups):
            some = all_rules[x : x + maxgroups]
            combined = (flatpat for (_, flatpat, _, _) in some)
            combined = "|".join("(^%s$)" % flatpat for flatpat in combined)
            combined = re.compile(combined).match
            rules = [(target, getargs) for (_, _, target, getargs) in some]
            comborules.append((combined, rules))

    def build(self, _name, *anons, **query):
        """Build an URL by filling the wildcards in a rule."""
        builder = self.builder.get(_name)
        if not builder:
            raise RouteBuildError("No route with that name.", _name)
        try:
            for i, value in enumerate(anons):
                query["anon%d" % i] = value
            url = "".join([f(query.pop(n)) if n else f for (n, f) in builder])
            return url if not query else url + "?" + urlencode(query)
        except KeyError as E:
            raise RouteBuildError("Missing URL argument: %r" % E.args[0])

    def match(self, environ):
        """Return a (target, url_args) tuple or raise HTTPError(400/404/405)."""
        verb = environ["REQUEST_METHOD"].upper()
        path = environ["PATH_INFO"] or "/"

        methods = (
            ("PROXY", "HEAD", "GET", "ANY")
            if verb == "HEAD"
            else ("PROXY", verb, "ANY")
        )

        for method in methods:
            if method in self.static and path in self.static[method]:
                target, getargs = self.static[method][path]
                return target, getargs(path) if getargs else {}
            elif method in self.dyna_regexes:
                for combined, rules in self.dyna_regexes[method]:
                    match = combined(path)
                    if match:
                        target, getargs = rules[match.lastindex - 1]
                        return target, getargs(path) if getargs else {}

        # No matching route found. Collect alternative methods for 405 response
        allowed = set([])
        nocheck = set(methods)
        for method in set(self.static) - nocheck:
            if path in self.static[method]:
                allowed.add(method)
        for method in set(self.dyna_regexes) - allowed - nocheck:
            for combined, rules in self.dyna_regexes[method]:
                match = combined(path)
                if match:
                    allowed.add(method)
        if allowed:
            allow_header = ",".join(sorted(allowed))
            raise HTTPError(405, "Method not allowed.", Allow=allow_header)

        # No matching route and no alternative method found. We give up
        raise HTTPError(404, "Not found: " + repr(path))

add(rule, method, target, name=None)

Add a new rule or replace the target for an existing rule.

Source code in src/sgn/bottle.py
def add(self, rule, method, target, name=None):
    """Add a new rule or replace the target for an existing rule."""
    anons = 0  # Number of anonymous wildcards found
    keys = []  # Names of keys
    pattern = ""  # Regular expression pattern with named groups
    filters = []  # Lists of wildcard input filters
    builder = []  # Data structure for the URL builder
    is_static = True

    for key, mode, conf in self._itertokens(rule):
        if mode:
            is_static = False
            if mode == "default":
                mode = self.default_filter
            mask, in_filter, out_filter = self.filters[mode](conf)
            if not key:
                pattern += "(?:%s)" % mask
                key = "anon%d" % anons
                anons += 1
            else:
                pattern += "(?P<%s>%s)" % (key, mask)
                keys.append(key)
            if in_filter:
                filters.append((key, in_filter))
            builder.append((key, out_filter or str))
        elif key:
            pattern += re.escape(key)
            builder.append((None, key))

    self.builder[rule] = builder
    if name:
        self.builder[name] = builder

    if is_static and not self.strict_order:
        self.static.setdefault(method, {})
        self.static[method][self.build(rule)] = (target, None)
        return

    try:
        re_pattern = re.compile("^(%s)$" % pattern)
        re_match = re_pattern.match
    except re.error as e:
        raise RouteSyntaxError("Could not add Route: %s (%s)" % (rule, e))

    if filters:

        def getargs(path):
            url_args = re_match(path).groupdict()
            for name, wildcard_filter in filters:
                try:
                    url_args[name] = wildcard_filter(url_args[name])
                except ValueError:
                    raise HTTPError(400, "Path has wrong format.")
            return url_args

    elif re_pattern.groupindex:

        def getargs(path):
            return re_match(path).groupdict()

    else:
        getargs = None

    flatpat = _re_flatten(pattern)
    whole_rule = (rule, flatpat, target, getargs)

    if (flatpat, method) in self._groups:
        if DEBUG:
            msg = "Route <%s %s> overwrites a previously defined route"
            warnings.warn(msg % (method, rule), RuntimeWarning, stacklevel=3)
        self.dyna_routes[method][self._groups[flatpat, method]] = whole_rule
    else:
        self.dyna_routes.setdefault(method, []).append(whole_rule)
        self._groups[flatpat, method] = len(self.dyna_routes[method]) - 1

    self._compile(method)

add_filter(name, func)

Add a filter. The provided function is called with the configuration string as parameter and must return a (regexp, to_python, to_url) tuple. The first element is a string, the last two are callables or None.

Source code in src/sgn/bottle.py
def add_filter(self, name, func):
    """Add a filter. The provided function is called with the configuration
    string as parameter and must return a (regexp, to_python, to_url) tuple.
    The first element is a string, the last two are callables or None."""
    self.filters[name] = func

build(_name, *anons, **query)

Build an URL by filling the wildcards in a rule.

Source code in src/sgn/bottle.py
def build(self, _name, *anons, **query):
    """Build an URL by filling the wildcards in a rule."""
    builder = self.builder.get(_name)
    if not builder:
        raise RouteBuildError("No route with that name.", _name)
    try:
        for i, value in enumerate(anons):
            query["anon%d" % i] = value
        url = "".join([f(query.pop(n)) if n else f for (n, f) in builder])
        return url if not query else url + "?" + urlencode(query)
    except KeyError as E:
        raise RouteBuildError("Missing URL argument: %r" % E.args[0])

match(environ)

Return a (target, url_args) tuple or raise HTTPError(400/404/405).

Source code in src/sgn/bottle.py
def match(self, environ):
    """Return a (target, url_args) tuple or raise HTTPError(400/404/405)."""
    verb = environ["REQUEST_METHOD"].upper()
    path = environ["PATH_INFO"] or "/"

    methods = (
        ("PROXY", "HEAD", "GET", "ANY")
        if verb == "HEAD"
        else ("PROXY", verb, "ANY")
    )

    for method in methods:
        if method in self.static and path in self.static[method]:
            target, getargs = self.static[method][path]
            return target, getargs(path) if getargs else {}
        elif method in self.dyna_regexes:
            for combined, rules in self.dyna_regexes[method]:
                match = combined(path)
                if match:
                    target, getargs = rules[match.lastindex - 1]
                    return target, getargs(path) if getargs else {}

    # No matching route found. Collect alternative methods for 405 response
    allowed = set([])
    nocheck = set(methods)
    for method in set(self.static) - nocheck:
        if path in self.static[method]:
            allowed.add(method)
    for method in set(self.dyna_regexes) - allowed - nocheck:
        for combined, rules in self.dyna_regexes[method]:
            match = combined(path)
            if match:
                allowed.add(method)
    if allowed:
        allow_header = ",".join(sorted(allowed))
        raise HTTPError(405, "Method not allowed.", Allow=allow_header)

    # No matching route and no alternative method found. We give up
    raise HTTPError(404, "Not found: " + repr(path))

SimpleTemplate

Bases: BaseTemplate


              flowchart TD
              sgn.bottle.SimpleTemplate[SimpleTemplate]
              sgn.bottle.BaseTemplate[BaseTemplate]

                              sgn.bottle.BaseTemplate --> sgn.bottle.SimpleTemplate
                


              click sgn.bottle.SimpleTemplate href "" "sgn.bottle.SimpleTemplate"
              click sgn.bottle.BaseTemplate href "" "sgn.bottle.BaseTemplate"
            
Source code in src/sgn/bottle.py
class SimpleTemplate(BaseTemplate):
    def prepare(self, escape_func=html_escape, noescape=False, syntax=None, **ka):
        self.cache = {}
        enc = self.encoding
        self._str = lambda x: touni(x, enc)
        self._escape = lambda x: escape_func(touni(x, enc))
        self.syntax = syntax
        if noescape:
            self._str, self._escape = self._escape, self._str

    @cached_property
    def co(self):
        return compile(self.code, self.filename or "<string>", "exec")

    @cached_property
    def code(self):
        source = self.source
        if not source:
            with open(self.filename, "rb") as f:
                source = f.read()
        try:
            source, encoding = touni(source), "utf8"
        except UnicodeError:
            raise depr(
                0, 11, "Unsupported template encodings.", "Use utf-8 for templates."
            )
        parser = StplParser(source, encoding=encoding, syntax=self.syntax)
        code = parser.translate()
        self.encoding = parser.encoding
        return code

    def _rebase(self, _env, _name=None, **kwargs):
        _env["_rebase"] = (_name, kwargs)

    def _include(self, _env, _name=None, **kwargs):
        env = _env.copy()
        env.update(kwargs)
        if _name not in self.cache:
            self.cache[_name] = self.__class__(
                name=_name, lookup=self.lookup, syntax=self.syntax
            )
        return self.cache[_name].execute(env["_stdout"], env)

    def execute(self, _stdout, kwargs):
        env = self.defaults.copy()
        env.update(kwargs)
        env.update(
            {
                "_stdout": _stdout,
                "_printlist": _stdout.extend,
                "include": functools.partial(self._include, env),
                "rebase": functools.partial(self._rebase, env),
                "_rebase": None,
                "_str": self._str,
                "_escape": self._escape,
                "get": env.get,
                "setdefault": env.setdefault,
                "defined": env.__contains__,
            }
        )
        exec(self.co, env)
        if env.get("_rebase"):
            subtpl, rargs = env.pop("_rebase")
            rargs["base"] = "".join(_stdout)  # copy stdout
            del _stdout[:]  # clear stdout
            return self._include(env, subtpl, **rargs)
        return env

    def render(self, *args, **kwargs):
        """Render the template using keyword arguments as local variables."""
        env = {}
        stdout = []
        for dictarg in args:
            env.update(dictarg)
        env.update(kwargs)
        self.execute(stdout, env)
        return "".join(stdout)

render(*args, **kwargs)

Render the template using keyword arguments as local variables.

Source code in src/sgn/bottle.py
def render(self, *args, **kwargs):
    """Render the template using keyword arguments as local variables."""
    env = {}
    stdout = []
    for dictarg in args:
        env.update(dictarg)
    env.update(kwargs)
    self.execute(stdout, env)
    return "".join(stdout)

StplParser

Bases: object


              flowchart TD
              sgn.bottle.StplParser[StplParser]

              

              click sgn.bottle.StplParser href "" "sgn.bottle.StplParser"
            

Parser for stpl templates.

Source code in src/sgn/bottle.py
class StplParser(object):
    """Parser for stpl templates."""

    _re_cache = {}  #: Cache for compiled re patterns

    # This huge pile of voodoo magic splits python code into 8 different tokens.
    # We use the verbose (?x) regex mode to make this more manageable

    _re_tok = r"""(
        [urbURB]*
        (?:  ''(?!')
            |""(?!")
            |'{6}
            |"{6}
            |'(?:[^\\']|\\.)+?'
            |"(?:[^\\"]|\\.)+?"
            |'{3}(?:[^\\]|\\.|\n)+?'{3}
            |"{3}(?:[^\\]|\\.|\n)+?"{3}
        )
    )"""

    _re_inl = _re_tok.replace(r"|\n", "")  # We re-use this string pattern later

    _re_tok += r"""
        # 2: Comments (until end of line, but not the newline itself)
        |(\#.*)

        # 3: Open and close (4) grouping tokens
        |([\[\{\(])
        |([\]\}\)])

        # 5,6: Keywords that start or continue a python block (only start of line)
        |^([\ \t]*(?:if|for|while|with|try|def|class)\b)
        |^([\ \t]*(?:elif|else|except|finally)\b)

        # 7: Our special 'end' keyword (but only if it stands alone)
        |((?:^|;)[\ \t]*end[\ \t]*(?=(?:%(block_close)s[\ \t]*)?\r?$|;|\#))

        # 8: A customizable end-of-code-block template token (only end of line)
        |(%(block_close)s[\ \t]*(?=\r?$))

        # 9: And finally, a single newline. The 10th token is 'everything else'
        |(\r?\n)
    """

    # Match the start tokens of code areas in a template
    _re_split = r"""(?m)^[ \t]*(\\?)((%(line_start)s)|(%(block_start)s))"""
    # Match inline statements (may contain python strings)
    _re_inl = r"""%%(inline_start)s((?:%s|[^'"\n])*?)%%(inline_end)s""" % _re_inl

    # add the flag in front of the regexp to avoid Deprecation warning (see Issue #949)
    # verbose and dot-matches-newline mode
    _re_tok = "(?mx)" + _re_tok
    _re_inl = "(?mx)" + _re_inl

    default_syntax = "<% %> % {{ }}"

    def __init__(self, source, syntax=None, encoding="utf8"):
        self.source, self.encoding = touni(source, encoding), encoding
        self.set_syntax(syntax or self.default_syntax)
        self.code_buffer, self.text_buffer = [], []
        self.lineno, self.offset = 1, 0
        self.indent, self.indent_mod = 0, 0
        self.paren_depth = 0

    def get_syntax(self):
        """Tokens as a space separated string (default: <% %> % {{ }})"""
        return self._syntax

    def set_syntax(self, syntax):
        self._syntax = syntax
        self._tokens = syntax.split()
        if syntax not in self._re_cache:
            names = "block_start block_close line_start inline_start inline_end"
            etokens = map(re.escape, self._tokens)
            pattern_vars = dict(zip(names.split(), etokens))
            patterns = (self._re_split, self._re_tok, self._re_inl)
            patterns = [re.compile(p % pattern_vars) for p in patterns]
            self._re_cache[syntax] = patterns
        self.re_split, self.re_tok, self.re_inl = self._re_cache[syntax]

    syntax = property(get_syntax, set_syntax)

    def translate(self):
        if self.offset:
            raise RuntimeError("Parser is a one time instance.")
        while True:
            m = self.re_split.search(self.source, pos=self.offset)
            if m:
                text = self.source[self.offset : m.start()]
                self.text_buffer.append(text)
                self.offset = m.end()
                if m.group(1):  # Escape syntax
                    line, sep, _ = self.source[self.offset :].partition("\n")
                    self.text_buffer.append(
                        self.source[m.start() : m.start(1)] + m.group(2) + line + sep
                    )
                    self.offset += len(line + sep)
                    continue
                self.flush_text()
                self.offset += self.read_code(
                    self.source[self.offset :], multiline=bool(m.group(4))
                )
            else:
                break
        self.text_buffer.append(self.source[self.offset :])
        self.flush_text()
        return "".join(self.code_buffer)

    def read_code(self, pysource, multiline):
        code_line, comment = "", ""
        offset = 0
        while True:
            m = self.re_tok.search(pysource, pos=offset)
            if not m:
                code_line += pysource[offset:]
                offset = len(pysource)
                self.write_code(code_line.strip(), comment)
                break
            code_line += pysource[offset : m.start()]
            offset = m.end()
            _str, _com, _po, _pc, _blk1, _blk2, _end, _cend, _nl = m.groups()
            if self.paren_depth > 0 and (_blk1 or _blk2):  # a if b else c
                code_line += _blk1 or _blk2
                continue
            if _str:  # Python string
                code_line += _str
            elif _com:  # Python comment (up to EOL)
                comment = _com
                if multiline and _com.strip().endswith(self._tokens[1]):
                    multiline = False  # Allow end-of-block in comments
            elif _po:  # open parenthesis
                self.paren_depth += 1
                code_line += _po
            elif _pc:  # close parenthesis
                if self.paren_depth > 0:
                    # we could check for matching parentheses here, but it's
                    # easier to leave that to python - just check counts
                    self.paren_depth -= 1
                code_line += _pc
            elif _blk1:  # Start-block keyword (if/for/while/def/try/...)
                code_line = _blk1
                self.indent += 1
                self.indent_mod -= 1
            elif _blk2:  # Continue-block keyword (else/elif/except/...)
                code_line = _blk2
                self.indent_mod -= 1
            elif _cend:  # The end-code-block template token (usually '%>')
                if multiline:
                    multiline = False
                else:
                    code_line += _cend
            elif _end:
                self.indent -= 1
                self.indent_mod += 1
            else:  # \n
                self.write_code(code_line.strip(), comment)
                self.lineno += 1
                code_line, comment, self.indent_mod = "", "", 0
                if not multiline:
                    break

        return offset

    def flush_text(self):
        text = "".join(self.text_buffer)
        del self.text_buffer[:]
        if not text:
            return
        parts, pos, nl = [], 0, "\\\n" + "  " * self.indent
        for m in self.re_inl.finditer(text):
            prefix, pos = text[pos : m.start()], m.end()
            if prefix:
                parts.append(nl.join(map(repr, prefix.splitlines(True))))
            if prefix.endswith("\n"):
                parts[-1] += nl
            parts.append(self.process_inline(m.group(1).strip()))
        if pos < len(text):
            prefix = text[pos:]
            lines = prefix.splitlines(True)
            if lines[-1].endswith("\\\\\n"):
                lines[-1] = lines[-1][:-3]
            elif lines[-1].endswith("\\\\\r\n"):
                lines[-1] = lines[-1][:-4]
            parts.append(nl.join(map(repr, lines)))
        code = "_printlist((%s,))" % ", ".join(parts)
        self.lineno += code.count("\n") + 1
        self.write_code(code)

    @staticmethod
    def process_inline(chunk):
        if chunk[0] == "!":
            return "_str(%s)" % chunk[1:]
        return "_escape(%s)" % chunk

    def write_code(self, line, comment=""):
        code = "  " * (self.indent + self.indent_mod)
        code += line.lstrip() + comment + "\n"
        self.code_buffer.append(code)

get_syntax()

Tokens as a space separated string (default: <% %> % {{ }})

Source code in src/sgn/bottle.py
def get_syntax(self):
    """Tokens as a space separated string (default: <% %> % {{ }})"""
    return self._syntax

TemplatePlugin

Bases: object


              flowchart TD
              sgn.bottle.TemplatePlugin[TemplatePlugin]

              

              click sgn.bottle.TemplatePlugin href "" "sgn.bottle.TemplatePlugin"
            

This plugin applies the :func:view decorator to all routes with a template config parameter. If the parameter is a tuple, the second element must be a dict with additional options (e.g. template_engine) or default variables for the template.

Source code in src/sgn/bottle.py
class TemplatePlugin(object):
    """This plugin applies the :func:`view` decorator to all routes with a
    `template` config parameter. If the parameter is a tuple, the second
    element must be a dict with additional options (e.g. `template_engine`)
    or default variables for the template."""

    name = "template"
    api = 2

    def setup(self, app):
        app.tpl = self

    def apply(self, callback, route):
        conf = route.config.get("template")
        if isinstance(conf, (tuple, list)) and len(conf) == 2:
            return view(conf[0], **conf[1])(callback)
        elif isinstance(conf, str):
            return view(conf)(callback)
        else:
            return callback

TornadoServer

Bases: ServerAdapter


              flowchart TD
              sgn.bottle.TornadoServer[TornadoServer]
              sgn.bottle.ServerAdapter[ServerAdapter]

                              sgn.bottle.ServerAdapter --> sgn.bottle.TornadoServer
                


              click sgn.bottle.TornadoServer href "" "sgn.bottle.TornadoServer"
              click sgn.bottle.ServerAdapter href "" "sgn.bottle.ServerAdapter"
            

The super hyped asynchronous server by facebook. Untested.

Source code in src/sgn/bottle.py
class TornadoServer(ServerAdapter):
    """The super hyped asynchronous server by facebook. Untested."""

    def run(self, handler):  # pragma: no cover
        import tornado.httpserver
        import tornado.ioloop
        import tornado.wsgi

        container = tornado.wsgi.WSGIContainer(handler)
        server = tornado.httpserver.HTTPServer(container)
        server.listen(port=self.port, address=self.host)
        tornado.ioloop.IOLoop.instance().start()

TwistedServer

Bases: ServerAdapter


              flowchart TD
              sgn.bottle.TwistedServer[TwistedServer]
              sgn.bottle.ServerAdapter[ServerAdapter]

                              sgn.bottle.ServerAdapter --> sgn.bottle.TwistedServer
                


              click sgn.bottle.TwistedServer href "" "sgn.bottle.TwistedServer"
              click sgn.bottle.ServerAdapter href "" "sgn.bottle.ServerAdapter"
            

Untested.

Source code in src/sgn/bottle.py
class TwistedServer(ServerAdapter):
    """Untested."""

    def run(self, handler):
        from twisted.internet import reactor
        from twisted.python.threadpool import ThreadPool
        from twisted.web import server, wsgi

        thread_pool = ThreadPool()
        thread_pool.start()
        reactor.addSystemEventTrigger("after", "shutdown", thread_pool.stop)
        factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
        reactor.listenTCP(self.port, factory, interface=self.host)
        if not reactor.running:
            reactor.run()

WSGIHeaderDict

Bases: MutableMapping


              flowchart TD
              sgn.bottle.WSGIHeaderDict[WSGIHeaderDict]

              

              click sgn.bottle.WSGIHeaderDict href "" "sgn.bottle.WSGIHeaderDict"
            

This dict-like class wraps a WSGI environ dict and provides convenient access to HTTP_* fields. Keys and values are native strings (2.x bytes or 3.x unicode) and keys are case-insensitive. If the WSGI environment contains non-native string values, these are de- or encoded using a lossless 'latin1' character set.

The API will remain stable even on changes to the relevant PEPs. Currently PEP 333, 444 and 3333 are supported. (PEP 444 is the only one that uses non-native strings.)

Source code in src/sgn/bottle.py
class WSGIHeaderDict(DictMixin):
    """This dict-like class wraps a WSGI environ dict and provides convenient
    access to HTTP_* fields. Keys and values are native strings
    (2.x bytes or 3.x unicode) and keys are case-insensitive. If the WSGI
    environment contains non-native string values, these are de- or encoded
    using a lossless 'latin1' character set.

    The API will remain stable even on changes to the relevant PEPs.
    Currently PEP 333, 444 and 3333 are supported. (PEP 444 is the only one
    that uses non-native strings.)
    """

    #: List of keys that do not have a ``HTTP_`` prefix.
    cgikeys = ("CONTENT_TYPE", "CONTENT_LENGTH")

    def __init__(self, environ):
        self.environ = environ

    def _ekey(self, key):
        """Translate header field name to CGI/WSGI environ key."""
        key = key.replace("-", "_").upper()
        if key in self.cgikeys:
            return key
        return "HTTP_" + key

    def raw(self, key, default=None):
        """Return the header value as is (may be bytes or unicode)."""
        return self.environ.get(self._ekey(key), default)

    def __getitem__(self, key):
        val = self.environ[self._ekey(key)]
        if py3k:
            if isinstance(val, unicode):
                val = val.encode("latin1").decode("utf8")
            else:
                val = val.decode("utf8")
        return val

    def __setitem__(self, key, value):
        raise TypeError("%s is read-only." % self.__class__)

    def __delitem__(self, key):
        raise TypeError("%s is read-only." % self.__class__)

    def __iter__(self):
        for key in self.environ:
            if key[:5] == "HTTP_":
                yield _hkey(key[5:])
            elif key in self.cgikeys:
                yield _hkey(key)

    def keys(self):
        return [x for x in self]

    def __len__(self):
        return len(self.keys())

    def __contains__(self, key):
        return self._ekey(key) in self.environ

raw(key, default=None)

Return the header value as is (may be bytes or unicode).

Source code in src/sgn/bottle.py
def raw(self, key, default=None):
    """Return the header value as is (may be bytes or unicode)."""
    return self.environ.get(self._ekey(key), default)

cached_property

Bases: object


              flowchart TD
              sgn.bottle.cached_property[cached_property]

              

              click sgn.bottle.cached_property href "" "sgn.bottle.cached_property"
            

A property that is only computed once per instance and then replaces itself with an ordinary attribute. Deleting the attribute resets the property.

Source code in src/sgn/bottle.py
class cached_property(object):
    """A property that is only computed once per instance and then replaces
    itself with an ordinary attribute. Deleting the attribute resets the
    property."""

    def __init__(self, func):
        update_wrapper(self, func)
        self.func = func

    def __get__(self, obj, cls):
        if obj is None:
            return self
        value = obj.__dict__[self.func.__name__] = self.func(obj)
        return value

lazy_attribute

Bases: object


              flowchart TD
              sgn.bottle.lazy_attribute[lazy_attribute]

              

              click sgn.bottle.lazy_attribute href "" "sgn.bottle.lazy_attribute"
            

A property that caches itself to the class object.

Source code in src/sgn/bottle.py
class lazy_attribute(object):
    """A property that caches itself to the class object."""

    def __init__(self, func):
        functools.update_wrapper(self, func, updated=[])
        self.getter = func

    def __get__(self, obj, cls):
        value = self.getter(cls)
        setattr(cls, self.__name__, value)
        return value

abort(code=500, text='Unknown Error.')

Aborts execution and causes a HTTP error.

Source code in src/sgn/bottle.py
def abort(code=500, text="Unknown Error."):
    """Aborts execution and causes a HTTP error."""
    raise HTTPError(code, text)

auth_basic(check, realm='private', text='Access denied')

Callback decorator to require HTTP auth (basic). TODO: Add route(check_auth=...) parameter.

Source code in src/sgn/bottle.py
def auth_basic(check, realm="private", text="Access denied"):
    """Callback decorator to require HTTP auth (basic).
    TODO: Add route(check_auth=...) parameter."""

    def decorator(func):

        @functools.wraps(func)
        def wrapper(*a, **ka):
            user, password = request.auth or (None, None)
            if user is None or not check(user, password):
                err = HTTPError(401, text)
                err.add_header("WWW-Authenticate", 'Basic realm="%s"' % realm)
                return err
            return func(*a, **ka)

        return wrapper

    return decorator

cookie_decode(data, key, digestmod=None)

Verify and decode an encoded string. Return an object or None.

Source code in src/sgn/bottle.py
def cookie_decode(data, key, digestmod=None):
    """Verify and decode an encoded string. Return an object or None."""
    depr(
        0, 13, "cookie_decode() will be removed soon.", "Do not use this API directly."
    )
    data = tob(data)
    if cookie_is_encoded(data):
        sig, msg = data.split(tob("?"), 1)
        digestmod = digestmod or hashlib.sha256
        hashed = hmac.new(tob(key), msg, digestmod=digestmod).digest()
        if _lscmp(sig[1:], base64.b64encode(hashed)):
            return pickle.loads(base64.b64decode(msg))
    return None

cookie_encode(data, key, digestmod=None)

Encode and sign a pickle-able object. Return a (byte) string

Source code in src/sgn/bottle.py
def cookie_encode(data, key, digestmod=None):
    """Encode and sign a pickle-able object. Return a (byte) string"""
    depr(
        0, 13, "cookie_encode() will be removed soon.", "Do not use this API directly."
    )
    digestmod = digestmod or hashlib.sha256
    msg = base64.b64encode(pickle.dumps(data, -1))
    sig = base64.b64encode(hmac.new(tob(key), msg, digestmod=digestmod).digest())
    return tob("!") + sig + tob("?") + msg

cookie_is_encoded(data)

Return True if the argument looks like a encoded cookie.

Source code in src/sgn/bottle.py
def cookie_is_encoded(data):
    """Return True if the argument looks like a encoded cookie."""
    depr(
        0,
        13,
        "cookie_is_encoded() will be removed soon.",
        "Do not use this API directly.",
    )
    return bool(data.startswith(tob("!")) and tob("?") in data)

debug(mode=True)

Change the debug level. There is only one debug level supported at the moment.

Source code in src/sgn/bottle.py
def debug(mode=True):
    """Change the debug level.
    There is only one debug level supported at the moment."""
    global DEBUG
    if mode:
        warnings.simplefilter("default")
    DEBUG = bool(mode)

html_escape(string)

Escape HTML special characters &<> and quotes '".

Source code in src/sgn/bottle.py
def html_escape(string):
    """Escape HTML special characters ``&<>`` and quotes ``'"``."""
    return (
        string.replace("&", "&amp;")
        .replace("<", "&lt;")
        .replace(">", "&gt;")
        .replace('"', "&quot;")
        .replace("'", "&#039;")
    )

html_quote(string)

Escape and quote a string to be used as an HTTP attribute.

Source code in src/sgn/bottle.py
def html_quote(string):
    """Escape and quote a string to be used as an HTTP attribute."""
    return '"%s"' % html_escape(string).replace("\n", "&#10;").replace(
        "\r", "&#13;"
    ).replace("\t", "&#9;")

load(target, **namespace)

Import a module or fetch an object from a module.

  • package.module returns module as a module object.
  • pack.mod:name returns the module variable name from pack.mod.
  • pack.mod:func() calls pack.mod.func() and returns the result.

The last form accepts not only function calls, but any type of expression. Keyword arguments passed to this function are available as local variables. Example: import_string('re:compile(x)', x='[a-z]')

Source code in src/sgn/bottle.py
def load(target, **namespace):
    """Import a module or fetch an object from a module.

    * ``package.module`` returns `module` as a module object.
    * ``pack.mod:name`` returns the module variable `name` from `pack.mod`.
    * ``pack.mod:func()`` calls `pack.mod.func()` and returns the result.

    The last form accepts not only function calls, but any type of
    expression. Keyword arguments passed to this function are available as
    local variables. Example: ``import_string('re:compile(x)', x='[a-z]')``
    """
    module, target = target.split(":", 1) if ":" in target else (target, None)
    if module not in sys.modules:
        __import__(module)
    if not target:
        return sys.modules[module]
    if target.isalnum():
        return getattr(sys.modules[module], target)
    package_name = module.split(".")[0]
    namespace[package_name] = sys.modules[package_name]
    return eval("%s.%s" % (module, target), namespace)

load_app(target)

Load a bottle application from a module and make sure that the import does not affect the current default application, but returns a separate application object. See :func:load for the target parameter.

Source code in src/sgn/bottle.py
def load_app(target):
    """Load a bottle application from a module and make sure that the import
    does not affect the current default application, but returns a separate
    application object. See :func:`load` for the target parameter."""
    global NORUN
    NORUN, nr_old = True, NORUN
    tmp = default_app.push()  # Create a new "default application"
    try:
        rv = load(target)  # Import the target module
        return rv if callable(rv) else tmp
    finally:
        default_app.remove(tmp)  # Remove the temporary added default application
        NORUN = nr_old

make_default_app_wrapper(name)

Return a callable that relays calls to the current default app.

Source code in src/sgn/bottle.py
def make_default_app_wrapper(name):
    """Return a callable that relays calls to the current default app."""

    @functools.wraps(getattr(Bottle, name))
    def wrapper(*a, **ka):
        return getattr(app(), name)(*a, **ka)

    return wrapper

parse_auth(header)

Parse rfc2617 HTTP authentication header string (basic) and return (user,pass) tuple or None

Source code in src/sgn/bottle.py
def parse_auth(header):
    """Parse rfc2617 HTTP authentication header string (basic) and return (user,pass) tuple or None"""
    try:
        method, data = header.split(None, 1)
        if method.lower() == "basic":
            user, pwd = touni(base64.b64decode(tob(data))).split(":", 1)
            return user, pwd
    except (KeyError, ValueError):
        return None

parse_date(ims)

Parse rfc1123, rfc850 and asctime timestamps and return UTC epoch.

Source code in src/sgn/bottle.py
def parse_date(ims):
    """Parse rfc1123, rfc850 and asctime timestamps and return UTC epoch."""
    try:
        ts = email.utils.parsedate_tz(ims)
        return calendar.timegm(ts[:8] + (0,)) - (ts[9] or 0)
    except (TypeError, ValueError, IndexError, OverflowError):
        return None

parse_range_header(header, maxlen=0)

Yield (start, end) ranges parsed from a HTTP Range header. Skip unsatisfiable ranges. The end index is non-inclusive.

Source code in src/sgn/bottle.py
def parse_range_header(header, maxlen=0):
    """Yield (start, end) ranges parsed from a HTTP Range header. Skip
    unsatisfiable ranges. The end index is non-inclusive."""
    if not header or header[:6] != "bytes=":
        return
    ranges = [r.split("-", 1) for r in header[6:].split(",") if "-" in r]
    for start, end in ranges:
        try:
            if not start:  # bytes=-100    -> last 100 bytes
                start, end = max(0, maxlen - int(end)), maxlen
            elif not end:  # bytes=100-    -> all but the first 99 bytes
                start, end = int(start), maxlen
            else:  # bytes=100-200 -> bytes 100-200 (inclusive)
                start, end = int(start), min(int(end) + 1, maxlen)
            if 0 <= start < end <= maxlen:
                yield start, end
        except ValueError:
            pass

path_shift(script_name, path_info, shift=1)

Shift path fragments from PATH_INFO to SCRIPT_NAME and vice versa.

:return: The modified paths. :param script_name: The SCRIPT_NAME path. :param script_name: The PATH_INFO path. :param shift: The number of path fragments to shift. May be negative to change the shift direction. (default: 1)

Source code in src/sgn/bottle.py
def path_shift(script_name, path_info, shift=1):
    """Shift path fragments from PATH_INFO to SCRIPT_NAME and vice versa.

    :return: The modified paths.
    :param script_name: The SCRIPT_NAME path.
    :param script_name: The PATH_INFO path.
    :param shift: The number of path fragments to shift. May be negative to
      change the shift direction. (default: 1)
    """
    if shift == 0:
        return script_name, path_info
    pathlist = path_info.strip("/").split("/")
    scriptlist = script_name.strip("/").split("/")
    if pathlist and pathlist[0] == "":
        pathlist = []
    if scriptlist and scriptlist[0] == "":
        scriptlist = []
    if 0 < shift <= len(pathlist):
        moved = pathlist[:shift]
        scriptlist = scriptlist + moved
        pathlist = pathlist[shift:]
    elif 0 > shift >= -len(scriptlist):
        moved = scriptlist[shift:]
        pathlist = moved + pathlist
        scriptlist = scriptlist[:shift]
    else:
        empty = "SCRIPT_NAME" if shift < 0 else "PATH_INFO"
        raise AssertionError("Cannot shift. Nothing left from %s" % empty)
    new_script_name = "/" + "/".join(scriptlist)
    new_path_info = "/" + "/".join(pathlist)
    if path_info.endswith("/") and pathlist:
        new_path_info += "/"
    return new_script_name, new_path_info

redirect(url, code=None)

Aborts execution and causes a 303 or 302 redirect, depending on the HTTP protocol version.

Source code in src/sgn/bottle.py
def redirect(url, code=None):
    """Aborts execution and causes a 303 or 302 redirect, depending on
    the HTTP protocol version."""
    if not code:
        code = 303 if request.get("SERVER_PROTOCOL") == "HTTP/1.1" else 302
    res = response.copy(cls=HTTPResponse)
    res.status = code
    res.body = ""
    res.set_header("Location", urljoin(request.url, url))
    raise res

run(app=None, server='wsgiref', host='127.0.0.1', port=8080, interval=1, reloader=False, quiet=False, plugins=None, debug=None, config=None, **kargs)

Start a server instance. This method blocks until the server terminates.

:param app: WSGI application or target string supported by :func:load_app. (default: :func:default_app) :param server: Server adapter to use. See :data:server_names keys for valid names or pass a :class:ServerAdapter subclass. (default: wsgiref) :param host: Server address to bind to. Pass 0.0.0.0 to listens on all interfaces including the external one. (default: 127.0.0.1) :param port: Server port to bind to. Values below 1024 require root privileges. (default: 8080) :param reloader: Start auto-reloading server? (default: False) :param interval: Auto-reloader interval in seconds (default: 1) :param quiet: Suppress output to stdout and stderr? (default: False) :param options: Options passed to the server adapter.

Source code in src/sgn/bottle.py
def run(
    app=None,
    server="wsgiref",
    host="127.0.0.1",
    port=8080,
    interval=1,
    reloader=False,
    quiet=False,
    plugins=None,
    debug=None,
    config=None,
    **kargs
):
    """Start a server instance. This method blocks until the server terminates.

    :param app: WSGI application or target string supported by
           :func:`load_app`. (default: :func:`default_app`)
    :param server: Server adapter to use. See :data:`server_names` keys
           for valid names or pass a :class:`ServerAdapter` subclass.
           (default: `wsgiref`)
    :param host: Server address to bind to. Pass ``0.0.0.0`` to listens on
           all interfaces including the external one. (default: 127.0.0.1)
    :param port: Server port to bind to. Values below 1024 require root
           privileges. (default: 8080)
    :param reloader: Start auto-reloading server? (default: False)
    :param interval: Auto-reloader interval in seconds (default: 1)
    :param quiet: Suppress output to stdout and stderr? (default: False)
    :param options: Options passed to the server adapter.
    """
    if NORUN:
        return
    if reloader and not os.environ.get("BOTTLE_CHILD"):
        import subprocess

        fd, lockfile = tempfile.mkstemp(prefix="bottle.", suffix=".lock")
        environ = os.environ.copy()
        environ["BOTTLE_CHILD"] = "true"
        environ["BOTTLE_LOCKFILE"] = lockfile
        args = [sys.executable] + sys.argv
        # If a package was loaded with `python -m`, then `sys.argv` needs to be
        # restored to the original value, or imports might break. See #1336
        if getattr(sys.modules.get("__main__"), "__package__", None):
            args[1:1] = ["-m", sys.modules["__main__"].__package__]

        try:
            os.close(fd)  # We never write to this file
            while os.path.exists(lockfile):
                p = subprocess.Popen(args, env=environ)
                while p.poll() is None:
                    os.utime(lockfile, None)  # Tell child we are still alive
                    time.sleep(interval)
                if p.returncode == 3:  # Child wants to be restarted
                    continue
                sys.exit(p.returncode)
        except KeyboardInterrupt:
            pass
        finally:
            if os.path.exists(lockfile):
                os.unlink(lockfile)
        return

    try:
        if debug is not None:
            _debug(debug)
        app = app or default_app()
        if isinstance(app, basestring):
            app = load_app(app)
        if not callable(app):
            raise ValueError("Application is not callable: %r" % app)

        for plugin in plugins or []:
            if isinstance(plugin, basestring):
                plugin = load(plugin)
            app.install(plugin)

        if config:
            app.config.update(config)

        if server in server_names:
            server = server_names.get(server)
        if isinstance(server, basestring):
            server = load(server)
        if isinstance(server, type):
            server = server(host=host, port=port, **kargs)
        if not isinstance(server, ServerAdapter):
            raise ValueError("Unknown or unsupported server: %r" % server)

        server.quiet = server.quiet or quiet
        if not server.quiet:
            _stderr(
                "Bottle v%s server starting up (using %s)..."
                % (__version__, repr(server))
            )
            _stderr("Listening on %s" % server._listen_url)
            _stderr("Hit Ctrl-C to quit.\n")

        if reloader:
            lockfile = os.environ.get("BOTTLE_LOCKFILE")
            bgcheck = FileCheckerThread(lockfile, interval)
            with bgcheck:
                server.run(app)
            if bgcheck.status == "reload":
                sys.exit(3)
        else:
            server.run(app)
    except KeyboardInterrupt:
        pass
    except (SystemExit, MemoryError):
        raise
    except:
        if not reloader:
            raise
        if not getattr(server, "quiet", quiet):
            print_exc()
        time.sleep(interval)
        sys.exit(3)

static_file(filename, root, mimetype=True, download=False, charset='UTF-8', etag=None, headers=None)

Open a file in a safe way and return an instance of :exc:HTTPResponse that can be sent back to the client.

:param filename: Name or path of the file to send, relative to root. :param root: Root path for file lookups. Should be an absolute directory path. :param mimetype: Provide the content-type header (default: guess from file extension) :param download: If True, ask the browser to open a Save as... dialog instead of opening the file with the associated program. You can specify a custom filename as a string. If not specified, the original filename is used (default: False). :param charset: The charset for files with a text/* mime-type. (default: UTF-8) :param etag: Provide a pre-computed ETag header. If set to False, ETag handling is disabled. (default: auto-generate ETag header) :param headers: Additional headers dict to add to the response.

While checking user input is always a good idea, this function provides additional protection against malicious filename parameters from breaking out of the root directory and leaking sensitive information to an attacker.

Read-protected files or files outside of the root directory are answered with 403 Access Denied. Missing files result in a 404 Not Found response. Conditional requests (If-Modified-Since, If-None-Match) are answered with 304 Not Modified whenever possible. HEAD and Range requests (used by download managers to check or continue partial downloads) are also handled automatically.

Source code in src/sgn/bottle.py
def static_file(
    filename,
    root,
    mimetype=True,
    download=False,
    charset="UTF-8",
    etag=None,
    headers=None,
):
    """Open a file in a safe way and return an instance of :exc:`HTTPResponse`
    that can be sent back to the client.

    :param filename: Name or path of the file to send, relative to ``root``.
    :param root: Root path for file lookups. Should be an absolute directory
        path.
    :param mimetype: Provide the content-type header (default: guess from
        file extension)
    :param download: If True, ask the browser to open a `Save as...` dialog
        instead of opening the file with the associated program. You can
        specify a custom filename as a string. If not specified, the
        original filename is used (default: False).
    :param charset: The charset for files with a ``text/*`` mime-type.
        (default: UTF-8)
    :param etag: Provide a pre-computed ETag header. If set to ``False``,
        ETag handling is disabled. (default: auto-generate ETag header)
    :param headers: Additional headers dict to add to the response.

    While checking user input is always a good idea, this function provides
    additional protection against malicious ``filename`` parameters from
    breaking out of the ``root`` directory and leaking sensitive information
    to an attacker.

    Read-protected files or files outside of the ``root`` directory are
    answered with ``403 Access Denied``. Missing files result in a
    ``404 Not Found`` response. Conditional requests (``If-Modified-Since``,
    ``If-None-Match``) are answered with ``304 Not Modified`` whenever
    possible. ``HEAD`` and ``Range`` requests (used by download managers to
    check or continue partial downloads) are also handled automatically.
    """

    root = os.path.join(os.path.abspath(root), "")
    filename = os.path.abspath(os.path.join(root, filename.strip("/\\")))
    headers = headers.copy() if headers else {}
    getenv = request.environ.get

    if not filename.startswith(root):
        return HTTPError(403, "Access denied.")
    if not os.path.exists(filename) or not os.path.isfile(filename):
        return HTTPError(404, "File does not exist.")
    if not os.access(filename, os.R_OK):
        return HTTPError(403, "You do not have permission to access this file.")

    if mimetype is True:
        name = download if isinstance(download, str) else filename
        mimetype, encoding = mimetypes.guess_type(name)
        if encoding == "gzip":
            mimetype = "application/gzip"
        elif encoding:  # e.g. bzip2 -> application/x-bzip2
            mimetype = "application/x-" + encoding

    if (
        charset
        and mimetype
        and "charset=" not in mimetype
        and (mimetype[:5] == "text/" or mimetype == "application/javascript")
    ):
        mimetype += "; charset=%s" % charset

    if mimetype:
        headers["Content-Type"] = mimetype

    if download is True:
        download = os.path.basename(filename)

    if download:
        download = download.replace('"', "")
        headers["Content-Disposition"] = 'attachment; filename="%s"' % download

    stats = os.stat(filename)
    headers["Content-Length"] = clen = stats.st_size
    headers["Last-Modified"] = email.utils.formatdate(stats.st_mtime, usegmt=True)
    headers["Date"] = email.utils.formatdate(time.time(), usegmt=True)

    if etag is None:
        etag = "%d:%d:%d:%d:%s" % (
            stats.st_dev,
            stats.st_ino,
            stats.st_mtime,
            clen,
            filename,
        )
        etag = hashlib.sha1(tob(etag)).hexdigest()

    if etag:
        headers["ETag"] = etag
        check = getenv("HTTP_IF_NONE_MATCH")
        if check and check == etag:
            return HTTPResponse(status=304, **headers)

    ims = getenv("HTTP_IF_MODIFIED_SINCE")
    if ims:
        ims = parse_date(ims.split(";")[0].strip())
        if ims is not None and ims >= int(stats.st_mtime):
            return HTTPResponse(status=304, **headers)

    body = "" if request.method == "HEAD" else open(filename, "rb")

    headers["Accept-Ranges"] = "bytes"
    range_header = getenv("HTTP_RANGE")
    if range_header:
        ranges = list(parse_range_header(range_header, clen))
        if not ranges:
            return HTTPError(416, "Requested Range Not Satisfiable")
        offset, end = ranges[0]
        rlen = end - offset
        headers["Content-Range"] = "bytes %d-%d/%d" % (offset, end - 1, clen)
        headers["Content-Length"] = str(rlen)
        if body:
            body = _closeiter(_rangeiter(body, offset, rlen), body.close)
        return HTTPResponse(body, status=206, **headers)
    return HTTPResponse(body, **headers)

template(*args, **kwargs)

Get a rendered template as a string iterator. You can use a name, a filename or a template string as first parameter. Template rendering arguments can be passed as dictionaries or directly (as keyword arguments).

Source code in src/sgn/bottle.py
def template(*args, **kwargs):
    """
    Get a rendered template as a string iterator.
    You can use a name, a filename or a template string as first parameter.
    Template rendering arguments can be passed as dictionaries
    or directly (as keyword arguments).
    """
    tpl = args[0] if args else None
    for dictarg in args[1:]:
        kwargs.update(dictarg)
    adapter = kwargs.pop("template_adapter", SimpleTemplate)
    lookup = kwargs.pop("template_lookup", TEMPLATE_PATH)
    tplid = (id(lookup), tpl)
    if tplid not in TEMPLATES or DEBUG:
        settings = kwargs.pop("template_settings", {})
        if isinstance(tpl, adapter):
            TEMPLATES[tplid] = tpl
            if settings:
                TEMPLATES[tplid].prepare(**settings)
        elif "\n" in tpl or "{" in tpl or "%" in tpl or "$" in tpl:
            TEMPLATES[tplid] = adapter(source=tpl, lookup=lookup, **settings)
        else:
            TEMPLATES[tplid] = adapter(name=tpl, lookup=lookup, **settings)
    if not TEMPLATES[tplid]:
        abort(500, "Template (%s) not found" % tpl)
    return TEMPLATES[tplid].render(kwargs)

view(tpl_name, **defaults)

Decorator: renders a template for a handler. The handler can control its behavior like that:

  • return a dict of template vars to fill out the template
  • return something other than a dict and the view decorator will not process the template, but return the handler result as is. This includes returning a HTTPResponse(dict) to get, for instance, JSON with autojson or other castfilters.
Source code in src/sgn/bottle.py
def view(tpl_name, **defaults):
    """Decorator: renders a template for a handler.
    The handler can control its behavior like that:

      - return a dict of template vars to fill out the template
      - return something other than a dict and the view decorator will not
        process the template, but return the handler result as is.
        This includes returning a HTTPResponse(dict) to get,
        for instance, JSON with autojson or other castfilters.
    """

    def decorator(func):

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            result = func(*args, **kwargs)
            if isinstance(result, (dict, DictMixin)):
                tplvars = defaults.copy()
                tplvars.update(result)
                return template(tpl_name, **tplvars)
            elif result is None:
                return template(tpl_name, **defaults)
            return result

        return wrapper

    return decorator

yieldroutes(func)

Return a generator for routes that match the signature (name, args) of the func parameter. This may yield more than one route if the function takes optional keyword arguments. The output is best described by example::

a()         -> '/a'
b(x, y)     -> '/b/<x>/<y>'
c(x, y=5)   -> '/c/<x>' and '/c/<x>/<y>'
d(x=5, y=6) -> '/d' and '/d/<x>' and '/d/<x>/<y>'
Source code in src/sgn/bottle.py
def yieldroutes(func):
    """Return a generator for routes that match the signature (name, args)
    of the func parameter. This may yield more than one route if the function
    takes optional keyword arguments. The output is best described by example::

        a()         -> '/a'
        b(x, y)     -> '/b/<x>/<y>'
        c(x, y=5)   -> '/c/<x>' and '/c/<x>/<y>'
        d(x=5, y=6) -> '/d' and '/d/<x>' and '/d/<x>/<y>'
    """
    path = "/" + func.__name__.replace("__", "/").lstrip("/")
    spec = getargspec(func)
    argc = len(spec[0]) - len(spec[3] or [])
    path += ("/<%s>" * argc) % tuple(spec[0][:argc])
    yield path
    for arg in spec[0][argc:]:
        path += "/<%s>" % arg
        yield path