Module aiocqhttp

此模块主要提供了 CQHttp 类(类似于 Flask 的 Flask 类和 Quart 的 Quart 类);除此之外,还从 aiocqhttp.messageaiocqhttp.eventaiocqhttp.exceptions 模块导入了一些常用的类、模块变量和函数,以便于使用。

Expand source code
"""
此模块主要提供了 `CQHttp` 类(类似于 Flask 的 `Flask` 类和 Quart 的 `Quart`
类);除此之外,还从 `message`、`event`、`exceptions`
模块导入了一些常用的类、模块变量和函数,以便于使用。
"""

import asyncio
import hmac
import logging
import re
from typing import (Dict, Any, Optional, AnyStr, Callable, Union, Awaitable,
                    Coroutine)

try:
    import ujson as json
except ImportError:
    import json

from quart import Quart, request, abort, jsonify, websocket, Response

from .api_impl import (AsyncApi, SyncApi, HttpApi, WebSocketReverseApi,
                       UnifiedApi, ResultStore)
from .bus import EventBus
from .exceptions import Error, TimingError
from .event import Event
from .message import Message, MessageSegment
from .utils import ensure_async, run_async_funcs
from .typing import Message_T

from . import exceptions
from .exceptions import *  # noqa: F401, F403

__all__ = [
    'CQHttp', 'Event', 'Message', 'MessageSegment',
]
__all__ += exceptions.__all__

__pdoc__ = {}


def _deco_maker(deco_method: Callable, type_: str) -> Callable:
    def deco_deco(self, arg: Optional[Union[str, Callable]] = None,
                  *sub_event_names: str) -> Callable:
        def deco(func: Callable) -> Callable:
            if isinstance(arg, str):
                e = [type_ + '.' + e for e in [arg] + list(sub_event_names)]
                # self.on(*e)(func)
                deco_method(self, *e)(func)
            else:
                # self.on(type_)(func)
                deco_method(self, type_)(func)
            return func

        if isinstance(arg, Callable):
            return deco(arg)
        return deco

    return deco_deco


class CQHttp(AsyncApi):
    """
    CQHTTP 机器人的主类,负责控制整个机器人的运行、事件处理函数的注册、与 CQHTTP
    的连接、CQHTTP API 的调用等。

    内部维护了一个 `Quart` 对象作为 web 服务器,提供 HTTP 协议的 ``/`` 和 WebSocket
    协议的 ``/ws/``、``/ws/api/``、``/ws/event/`` 端点供 CQHTTP 连接。

    由于基类 `api_impl.AsyncApi` 继承了 `api.Api` 的 `__getattr__`
    魔术方法,因此可以在 bot 对象上直接调用 CQHTTP API,例如:

    ```py
    await bot.send_private_msg(user_id=10001000, message='你好')
    friends = await bot.get_friend_list()
    ```

    也可以通过 `CQHttp.call_action` 方法调用 API,例如:

    ```py
    await bot.call_action('set_group_whole_ban', group_id=10010)
    ```

    两种调用 API 的方法最终都通过 `CQHttp.api` 属性来向 CQHTTP
    发送请求并获取调用结果。
    """

    def __init__(self, import_name: str = '', *,
                 api_root: Optional[str] = None,
                 access_token: Optional[str] = None,
                 secret: Optional[AnyStr] = None,
                 message_class: Optional[type] = None,
                 api_timeout_sec: Optional[float] = None,
                 server_app_kwargs: Optional[dict] = None,
                 **kwargs):
        """
        ``import_name`` 参数为当前模块(使用 `CQHttp` 的模块)的导入名,通常传入
        ``__name__`` 或不传入。

        ``api_root`` 参数为 CQHTTP API 的 URL,``access_token`` 和
        ``secret`` 参数为 CQHTTP 配置中填写的对应项。

        ``message_class`` 参数为要用来对 `Event.message` 进行转换的消息类,可使用
        `Message`,例如:

        ```py
        from aiocqhttp import CQHttp, Message

        bot = CQHttp(message_class=Message)

        @bot.on_message
        async def handler(event):
            # 这里 event.message 已经被转换为 Message 对象
            assert isinstance(event.message, Message)
        ```

        ``api_timeout_sec`` 参数用于设置 CQHTTP API 请求的超时时间,单位是秒。

        ``server_app_kwargs`` 参数用于配置 `Quart` 对象,将以命名参数形式传给入其初始化函数。
        """
        self._api = UnifiedApi()
        self._sync_api = None
        self._bus = EventBus()
        self._before_sending_funcs = set()
        self._loop = None

        self._server_app = Quart(import_name, **(server_app_kwargs or {}))
        self._server_app.before_serving(self._before_serving)
        self._server_app.add_url_rule('/', methods=['POST'],
                                      view_func=self._handle_http_event)
        for p in ('/ws', '/ws/event', '/ws/api'):
            self._server_app.add_websocket(p, strict_slashes=False,
                                           view_func=self._handle_wsr)

        self._configure(
            api_root,
            access_token,
            secret,
            message_class,
            api_timeout_sec
        )

    def _configure(self,
                   api_root: Optional[str] = None,
                   access_token: Optional[str] = None,
                   secret: Optional[AnyStr] = None,
                   message_class: Optional[type] = None,
                   api_timeout_sec: Optional[float] = None):
        self._message_class = message_class
        api_timeout_sec = api_timeout_sec or 60  # wait for 60 secs by default
        self._access_token = access_token
        self._secret = secret
        self._api._http_api = HttpApi(api_root, access_token, api_timeout_sec)
        self._wsr_api_clients = {}  # connected wsr api clients
        self._api._wsr_api = WebSocketReverseApi(
            self._wsr_api_clients, api_timeout_sec)

    async def _before_serving(self):
        self._loop = asyncio.get_running_loop()

    @property
    def asgi(self) -> Callable[[dict, Callable, Callable], Awaitable]:
        """ASGI app 对象,可使用支持 ASGI 的 web 服务器软件部署。"""
        return self._server_app

    @property
    def server_app(self) -> Quart:
        """Quart app 对象,可用来对 Quart 的运行做精细控制,或添加新的路由等。"""
        return self._server_app

    @property
    def logger(self) -> logging.Logger:
        """Quart app 的 logger,等价于 ``bot.server_app.logger``。"""
        return self._server_app.logger

    @property
    def loop(self) -> Optional[asyncio.AbstractEventLoop]:
        """Quart app 所在的 event loop,在 app 运行之前为 `None`。"""
        return self._loop

    @property
    def api(self) -> AsyncApi:
        """`api_impl.AsyncApi` 对象,用于异步地调用 CQHTTP API。"""
        return self._api

    @property
    def sync(self) -> SyncApi:
        """
        `api_impl.SyncApi` 对象,用于同步地调用 CQHTTP API,例如:

        ```py
        @bot.on_message('group')
        def sync_handler(event):
            user_info = bot.sync.get_group_member_info(
                group_id=event.group_id, user_id=event.user_id
            )
            ...
        ```
        """
        if not self._sync_api:
            if not self._loop:
                raise TimingError('attempt to access sync api '
                                  'before bot is running')
            self._sync_api = SyncApi(self._api, self._loop)
        return self._sync_api

    def run(self, host: str = None, port: int = None, *args, **kwargs) -> None:
        """运行 bot 对象,实际就是运行 Quart app,参数与 `Quart.run` 一致。"""
        if 'use_reloader' not in kwargs:
            kwargs['use_reloader'] = False
        self._server_app.run(host=host, port=port, *args, **kwargs)

    def run_task(self, host: str = None, port: int = None,
                 *args, **kwargs) -> Coroutine[None, None, None]:
        if 'use_reloader' not in kwargs:
            kwargs['use_reloader'] = False
        return self._server_app.run_task(host=host, port=port, *args, **kwargs)

    async def call_action(self, action: str, **params) -> Any:
        """
        通过内部维护的 `api_impl.AsyncApi` 具体实现类调用 CQHTTP API,``action``
        为要调用的 API 动作名,``**params`` 为 API 所需参数。
        """
        return await self._api.call_action(action=action, **params)

    async def send(self, event: Event, message: Message_T,
                   **kwargs) -> Optional[Dict[str, Any]]:
        """
        向触发事件的主体发送消息。

        ``event`` 参数为事件对象,``message`` 参数为要发送的消息。可额外传入 ``at_sender``
        命名参数用于控制是否 at 事件的触发者,默认为 `False`。其它命名参数作为
        CQHTTP API ``send_msg`` 的参数直接传递。
        """
        msg = message if isinstance(message, Message) else Message(message)
        await run_async_funcs(self._before_sending_funcs, event, msg, kwargs)

        at_sender = kwargs.pop('at_sender', False) and ('user_id' in event)

        keys = {'message_type', 'user_id', 'group_id', 'discuss_id'}
        params = {k: v for k, v in event.items() if k in keys}
        params['message'] = msg
        params.update(kwargs)

        if 'message_type' not in params:
            if 'group_id' in params:
                params['message_type'] = 'group'
            elif 'discuss_id' in params:
                params['message_type'] = 'discuss'
            elif 'user_id' in params:
                params['message_type'] = 'private'

        if at_sender and params['message_type'] != 'private':
            params['message'] = MessageSegment.at(params['user_id']) + \
                                MessageSegment.text(' ') + params['message']

        return await self.send_msg(**params)

    def before_sending(self, func: Callable) -> Callable:
        """
        注册发送消息前的钩子函数,用作装饰器,例如:

        ```py
        @bot.before_sending
        async def hook(event: Event, message: Message, kwargs: Dict[str, Any]):
            message.clear()
            message.append(MessageSegment.text('hooked!'))
        ```

        该钩子函数在刚进入 `CQHttp.send` 函数时运行,用户可在钩子函数中修改要发送的
        ``message`` 和发送参数 ``kwargs``。
        """
        self._before_sending_funcs.add(ensure_async(func))
        return func

    def subscribe(self, event_name: str, func: Callable) -> None:
        """注册事件处理函数。"""
        self._bus.subscribe(event_name, ensure_async(func))

    def unsubscribe(self, event_name: str, func: Callable) -> None:
        """取消注册事件处理函数。"""
        self._bus.unsubscribe(event_name, func)

    def on(self, *event_names: str) -> Callable:
        """
        注册事件处理函数,用作装饰器,例如:

        ```py
        @bot.on('notice.group_decrease', 'notice.group_increase')
        async def handler(event):
            pass
        ```

        参数为要注册的事件名,格式是点号分割的各级事件类型,见 `Event.name`。

        可以多次调用,一个函数可作为多个事件的处理函数,一个事件也可以有多个处理函数。

        可以按不同粒度注册处理函数,例如:

        ```py
        @bot.on('message')
        async def handle_message(event):
            pass

        @bot.on('message.private')
        async def handle_private_message(event):
            pass

        @bot.on('message.private.friend')
        async def handle_friend_private_message(event):
            pass
        ```

        当收到好友私聊消息时,会首先运行 ``handle_friend_private_message``,然后运行
        ``handle_private_message``,最后运行 ``handle_message``。
        """

        def deco(func: Callable) -> Callable:
            for name in event_names:
                self.subscribe(name, func)
            return func

        return deco

    on_message = _deco_maker(on, 'message')
    __pdoc__['CQHttp.on_message'] = """
    注册消息事件处理函数,用作装饰器,例如:

    ```py
    @bot.on_message('private')
    async def handler(event):
        pass
    ```

    这等价于:

    ```py
    @bot.on('message.private')
    async def handler(event):
        pass
    ```

    也可以不加参数,表示注册为所有消息事件的处理函数,例如:

    ```py
    @bot.on_message
    async def handler(event):
        pass
    ```
    """

    on_notice = _deco_maker(on, 'notice')
    __pdoc__['CQHttp.on_notice'] = "注册通知事件处理函数,用作装饰器,用法同上。"

    on_request = _deco_maker(on, 'request')
    __pdoc__['CQHttp.on_request'] = "注册请求事件处理函数,用作装饰器,用法同上。"

    on_meta_event = _deco_maker(on, 'meta_event')
    __pdoc__['CQHttp.on_meta_event'] = "注册元事件处理函数,用作装饰器,用法同上。"

    def hook_before(self, event_name: str, func: Callable) -> None:
        """注册事件处理前的钩子函数。"""
        self._bus.hook_before(event_name, ensure_async(func))

    def unhook_before(self, event_name: str, func: Callable) -> None:
        """取消注册事件处理前的钩子函数。"""
        self._bus.unhook_before(event_name, func)

    def before(self, *event_names: str) -> Callable:
        """
        注册事件处理前的钩子函数,用作装饰器,例如:

        ```py
        @bot.before('notice.group_decrease', 'notice.group_increase')
        async def hook(event):
            pass
        ```

        参数为要注册的事件名,格式是点号分割的各级事件类型,见 `Event.name`。

        钩子函数的注册方法和事件处理函数几乎完全一致,只需将 ``on`` 改为 ``before``。

        各级 before 钩子函数全部运行完成后,才会运行事件处理函数。
        """

        def deco(func: Callable) -> Callable:
            for name in event_names:
                self.hook_before(name, func)
            return func

        return deco

    before_message = _deco_maker(before, 'message')
    __pdoc__['CQHttp.before_message'] = """
    注册消息事件处理前的钩子函数,用作装饰器,例如:

    ```py
    @bot.before_message('private')
    async def hook(event):
        pass
    ```

    这等价于:

    ```py
    @bot.before('message.private')
    async def hook(event):
        pass
    ```

    也可以不加参数,表示注册为所有消息事件处理前的钩子函数,例如:

    ```py
    @bot.before_message
    async def hook(event):
        pass
    ```
    """

    before_notice = _deco_maker(before, 'notice')
    __pdoc__['CQHttp.before_notice'] = "注册通知事件处理前的钩子函数,用作装饰器,用法同上。"

    before_request = _deco_maker(before, 'request')
    __pdoc__['CQHttp.before_request'] = "注册请求事件处理前的钩子函数,用作装饰器,用法同上。"

    before_meta_event = _deco_maker(before, 'meta_event')
    __pdoc__['CQHttp.before_meta_event'] = "注册元事件处理前的钩子函数,用作装饰器,用法同上。"

    def on_startup(self, func: Callable) -> Callable:
        """
        注册 bot 启动时钩子函数,用作装饰器,例如:

        ```py
        @bot.on_startup
        async def startup():
            await db.init()
        ```
        """
        return self.server_app.before_serving(func)

    def on_websocket_connection(self, func: Callable) -> Callable:
        """
        注册 WebSocket 连接元事件处理函数,等价于 ``on_meta_event('lifecycle.connect')``,例如:

        ```py
        @bot.on_websocket_connection
        async def handler(event):
            global groups
            groups = await bot.get_group_list(self_id=event.self_id)
        ```

        注:仅支持 CQHTTP v4.14+。
        """
        return self.on_meta_event('lifecycle.connect')(func)

    async def _handle_http_event(self) -> Response:
        if self._secret:
            if 'X-Signature' not in request.headers:
                self.logger.warning('signature header is missed')
                abort(401)

            sec = self._secret
            sec = sec.encode('utf-8') if isinstance(sec, str) else sec
            sig = hmac.new(sec, await request.get_data(), 'sha1').hexdigest()
            if request.headers['X-Signature'] != 'sha1=' + sig:
                self.logger.warning('signature header is invalid')
                abort(403)

        payload = await request.json
        if not isinstance(payload, dict):
            abort(400)

        if request.headers['X-Self-ID'] in self._wsr_api_clients:
            self.logger.warning(
                'there is already a reverse websocket api connection, '
                'so the event may be handled twice.')

        response = await self._handle_event(payload)
        if isinstance(response, dict):
            return jsonify(response)
        return Response('', 204)

    async def _handle_wsr(self) -> None:
        if self._access_token:
            auth = websocket.headers.get('Authorization', '')
            m = re.fullmatch(r'(?:[Tt]oken|[Bb]earer) (?P<token>\S+)', auth)
            if not m:
                self.logger.warning('authorization header is missed')
                abort(401)

            token_given = m.group('token').strip()
            if token_given != self._access_token:
                self.logger.warning('authorization header is invalid')
                abort(403)

        role = websocket.headers['X-Client-Role'].lower()
        if role == 'event':
            await self._handle_wsr_event()
        elif role == 'api':
            await self._handle_wsr_api()
        elif role == 'universal':
            await self._handle_wsr_universal()

    async def _handle_wsr_event(self) -> None:
        try:
            while True:
                try:
                    payload = json.loads(await websocket.receive())
                except ValueError:
                    payload = None

                if not isinstance(payload, dict):
                    # ignore invalid payload
                    continue

                asyncio.create_task(self._handle_event_with_response(payload))
        finally:
            pass

    async def _handle_wsr_api(self) -> None:
        self._add_wsr_api_client()
        try:
            while True:
                try:
                    ResultStore.add(json.loads(await websocket.receive()))
                except ValueError:
                    pass
        finally:
            self._remove_wsr_api_client()

    async def _handle_wsr_universal(self) -> None:
        self._add_wsr_api_client()
        try:
            while True:
                try:
                    payload = json.loads(await websocket.receive())
                except ValueError:
                    payload = None

                if not isinstance(payload, dict):
                    # ignore invalid payload
                    continue

                if 'post_type' in payload:
                    # is a event
                    asyncio.create_task(
                        self._handle_event_with_response(payload))
                elif payload:
                    # is a api result
                    ResultStore.add(payload)
        finally:
            self._remove_wsr_api_client()

    def _add_wsr_api_client(self) -> None:
        ws = websocket._get_current_object()
        self_id = websocket.headers['X-Self-ID']
        self._wsr_api_clients[self_id] = ws

    def _remove_wsr_api_client(self) -> None:
        self_id = websocket.headers['X-Self-ID']
        if self_id in self._wsr_api_clients:
            # we must check the existence here,
            # because we allow wildcard ws connections,
            # that is, the self_id may be '*'
            del self._wsr_api_clients[self_id]

    async def _handle_event(self, payload: Dict[str, Any]) -> Any:
        ev = Event.from_payload(payload)
        if not ev:
            return

        event_name = ev.name
        self.logger.info(f'received event: {event_name}')

        if self._message_class and 'message' in ev:
            ev['message'] = self._message_class(ev['message'])
        results = list(filter(lambda r: r is not None,
                              await self._bus.emit(event_name, ev)))
        # return the first non-none result
        return results[0] if results else None

    async def _handle_event_with_response(
            self, payload: Dict[str, Any]) -> None:
        response = await self._handle_event(payload)
        if isinstance(response, dict):
            payload.pop('message', None)  # avoid wasting bandwidth
            payload.pop('raw_message', None)
            payload.pop('comment', None)
            payload.pop('sender', None)
            try:
                await self._api.call_action(
                    self_id=payload['self_id'],
                    action='.handle_quick_operation_async',
                    context=payload, operation=response
                )
            except Error:
                pass

Sub-modules

aiocqhttp.api

此模块提供了 CQHTTP API 的接口类。

aiocqhttp.api_impl

此模块提供了 CQHTTP API 相关的实现类。

aiocqhttp.bus

此模块提供事件总线相关类。

aiocqhttp.default

此模块提供了默认 bot 对象及用于控制和使用它的相关函数、对象、和装饰器。

aiocqhttp.event

此模块提供了 CQHTTP 事件相关的类。

aiocqhttp.exceptions

此模块提供了异常类。

aiocqhttp.message

此模块提供了消息相关类。

aiocqhttp.typing

此模块提供了用于类型提示的定义。

aiocqhttp.utils

此模块提供了工具函数。

Classes

class CQHttp (import_name: str = '', *, api_root: Union[str, NoneType] = None, access_token: Union[str, NoneType] = None, secret: Union[~AnyStr, NoneType] = None, message_class: Union[type, NoneType] = None, api_timeout_sec: Union[float, NoneType] = None, server_app_kwargs: Union[dict, NoneType] = None, **kwargs)

CQHTTP 机器人的主类,负责控制整个机器人的运行、事件处理函数的注册、与 CQHTTP 的连接、CQHTTP API 的调用等。

内部维护了一个 Quart 对象作为 web 服务器,提供 HTTP 协议的 / 和 WebSocket 协议的 /ws//ws/api//ws/event/ 端点供 CQHTTP 连接。

由于基类 AsyncApi 继承了 Api__getattr__ 魔术方法,因此可以在 bot 对象上直接调用 CQHTTP API,例如:

await bot.send_private_msg(user_id=10001000, message='你好')
friends = await bot.get_friend_list()

也可以通过 CQHttp.call_action() 方法调用 API,例如:

await bot.call_action('set_group_whole_ban', group_id=10010)

两种调用 API 的方法最终都通过 CQHttp.api 属性来向 CQHTTP 发送请求并获取调用结果。

import_name 参数为当前模块(使用 CQHttp 的模块)的导入名,通常传入 __name__ 或不传入。

api_root 参数为 CQHTTP API 的 URL,access_tokensecret 参数为 CQHTTP 配置中填写的对应项。

message_class 参数为要用来对 Event.message 进行转换的消息类,可使用 Message,例如:

from aiocqhttp import CQHttp, Message

bot = CQHttp(message_class=Message)

@bot.on_message
async def handler(event):
    # 这里 event.message 已经被转换为 Message 对象
    assert isinstance(event.message, Message)

api_timeout_sec 参数用于设置 CQHTTP API 请求的超时时间,单位是秒。

server_app_kwargs 参数用于配置 Quart 对象,将以命名参数形式传给入其初始化函数。

Expand source code
class CQHttp(AsyncApi):
    """
    CQHTTP 机器人的主类,负责控制整个机器人的运行、事件处理函数的注册、与 CQHTTP
    的连接、CQHTTP API 的调用等。

    内部维护了一个 `Quart` 对象作为 web 服务器,提供 HTTP 协议的 ``/`` 和 WebSocket
    协议的 ``/ws/``、``/ws/api/``、``/ws/event/`` 端点供 CQHTTP 连接。

    由于基类 `api_impl.AsyncApi` 继承了 `api.Api` 的 `__getattr__`
    魔术方法,因此可以在 bot 对象上直接调用 CQHTTP API,例如:

    ```py
    await bot.send_private_msg(user_id=10001000, message='你好')
    friends = await bot.get_friend_list()
    ```

    也可以通过 `CQHttp.call_action` 方法调用 API,例如:

    ```py
    await bot.call_action('set_group_whole_ban', group_id=10010)
    ```

    两种调用 API 的方法最终都通过 `CQHttp.api` 属性来向 CQHTTP
    发送请求并获取调用结果。
    """

    def __init__(self, import_name: str = '', *,
                 api_root: Optional[str] = None,
                 access_token: Optional[str] = None,
                 secret: Optional[AnyStr] = None,
                 message_class: Optional[type] = None,
                 api_timeout_sec: Optional[float] = None,
                 server_app_kwargs: Optional[dict] = None,
                 **kwargs):
        """
        ``import_name`` 参数为当前模块(使用 `CQHttp` 的模块)的导入名,通常传入
        ``__name__`` 或不传入。

        ``api_root`` 参数为 CQHTTP API 的 URL,``access_token`` 和
        ``secret`` 参数为 CQHTTP 配置中填写的对应项。

        ``message_class`` 参数为要用来对 `Event.message` 进行转换的消息类,可使用
        `Message`,例如:

        ```py
        from aiocqhttp import CQHttp, Message

        bot = CQHttp(message_class=Message)

        @bot.on_message
        async def handler(event):
            # 这里 event.message 已经被转换为 Message 对象
            assert isinstance(event.message, Message)
        ```

        ``api_timeout_sec`` 参数用于设置 CQHTTP API 请求的超时时间,单位是秒。

        ``server_app_kwargs`` 参数用于配置 `Quart` 对象,将以命名参数形式传给入其初始化函数。
        """
        self._api = UnifiedApi()
        self._sync_api = None
        self._bus = EventBus()
        self._before_sending_funcs = set()
        self._loop = None

        self._server_app = Quart(import_name, **(server_app_kwargs or {}))
        self._server_app.before_serving(self._before_serving)
        self._server_app.add_url_rule('/', methods=['POST'],
                                      view_func=self._handle_http_event)
        for p in ('/ws', '/ws/event', '/ws/api'):
            self._server_app.add_websocket(p, strict_slashes=False,
                                           view_func=self._handle_wsr)

        self._configure(
            api_root,
            access_token,
            secret,
            message_class,
            api_timeout_sec
        )

    def _configure(self,
                   api_root: Optional[str] = None,
                   access_token: Optional[str] = None,
                   secret: Optional[AnyStr] = None,
                   message_class: Optional[type] = None,
                   api_timeout_sec: Optional[float] = None):
        self._message_class = message_class
        api_timeout_sec = api_timeout_sec or 60  # wait for 60 secs by default
        self._access_token = access_token
        self._secret = secret
        self._api._http_api = HttpApi(api_root, access_token, api_timeout_sec)
        self._wsr_api_clients = {}  # connected wsr api clients
        self._api._wsr_api = WebSocketReverseApi(
            self._wsr_api_clients, api_timeout_sec)

    async def _before_serving(self):
        self._loop = asyncio.get_running_loop()

    @property
    def asgi(self) -> Callable[[dict, Callable, Callable], Awaitable]:
        """ASGI app 对象,可使用支持 ASGI 的 web 服务器软件部署。"""
        return self._server_app

    @property
    def server_app(self) -> Quart:
        """Quart app 对象,可用来对 Quart 的运行做精细控制,或添加新的路由等。"""
        return self._server_app

    @property
    def logger(self) -> logging.Logger:
        """Quart app 的 logger,等价于 ``bot.server_app.logger``。"""
        return self._server_app.logger

    @property
    def loop(self) -> Optional[asyncio.AbstractEventLoop]:
        """Quart app 所在的 event loop,在 app 运行之前为 `None`。"""
        return self._loop

    @property
    def api(self) -> AsyncApi:
        """`api_impl.AsyncApi` 对象,用于异步地调用 CQHTTP API。"""
        return self._api

    @property
    def sync(self) -> SyncApi:
        """
        `api_impl.SyncApi` 对象,用于同步地调用 CQHTTP API,例如:

        ```py
        @bot.on_message('group')
        def sync_handler(event):
            user_info = bot.sync.get_group_member_info(
                group_id=event.group_id, user_id=event.user_id
            )
            ...
        ```
        """
        if not self._sync_api:
            if not self._loop:
                raise TimingError('attempt to access sync api '
                                  'before bot is running')
            self._sync_api = SyncApi(self._api, self._loop)
        return self._sync_api

    def run(self, host: str = None, port: int = None, *args, **kwargs) -> None:
        """运行 bot 对象,实际就是运行 Quart app,参数与 `Quart.run` 一致。"""
        if 'use_reloader' not in kwargs:
            kwargs['use_reloader'] = False
        self._server_app.run(host=host, port=port, *args, **kwargs)

    def run_task(self, host: str = None, port: int = None,
                 *args, **kwargs) -> Coroutine[None, None, None]:
        if 'use_reloader' not in kwargs:
            kwargs['use_reloader'] = False
        return self._server_app.run_task(host=host, port=port, *args, **kwargs)

    async def call_action(self, action: str, **params) -> Any:
        """
        通过内部维护的 `api_impl.AsyncApi` 具体实现类调用 CQHTTP API,``action``
        为要调用的 API 动作名,``**params`` 为 API 所需参数。
        """
        return await self._api.call_action(action=action, **params)

    async def send(self, event: Event, message: Message_T,
                   **kwargs) -> Optional[Dict[str, Any]]:
        """
        向触发事件的主体发送消息。

        ``event`` 参数为事件对象,``message`` 参数为要发送的消息。可额外传入 ``at_sender``
        命名参数用于控制是否 at 事件的触发者,默认为 `False`。其它命名参数作为
        CQHTTP API ``send_msg`` 的参数直接传递。
        """
        msg = message if isinstance(message, Message) else Message(message)
        await run_async_funcs(self._before_sending_funcs, event, msg, kwargs)

        at_sender = kwargs.pop('at_sender', False) and ('user_id' in event)

        keys = {'message_type', 'user_id', 'group_id', 'discuss_id'}
        params = {k: v for k, v in event.items() if k in keys}
        params['message'] = msg
        params.update(kwargs)

        if 'message_type' not in params:
            if 'group_id' in params:
                params['message_type'] = 'group'
            elif 'discuss_id' in params:
                params['message_type'] = 'discuss'
            elif 'user_id' in params:
                params['message_type'] = 'private'

        if at_sender and params['message_type'] != 'private':
            params['message'] = MessageSegment.at(params['user_id']) + \
                                MessageSegment.text(' ') + params['message']

        return await self.send_msg(**params)

    def before_sending(self, func: Callable) -> Callable:
        """
        注册发送消息前的钩子函数,用作装饰器,例如:

        ```py
        @bot.before_sending
        async def hook(event: Event, message: Message, kwargs: Dict[str, Any]):
            message.clear()
            message.append(MessageSegment.text('hooked!'))
        ```

        该钩子函数在刚进入 `CQHttp.send` 函数时运行,用户可在钩子函数中修改要发送的
        ``message`` 和发送参数 ``kwargs``。
        """
        self._before_sending_funcs.add(ensure_async(func))
        return func

    def subscribe(self, event_name: str, func: Callable) -> None:
        """注册事件处理函数。"""
        self._bus.subscribe(event_name, ensure_async(func))

    def unsubscribe(self, event_name: str, func: Callable) -> None:
        """取消注册事件处理函数。"""
        self._bus.unsubscribe(event_name, func)

    def on(self, *event_names: str) -> Callable:
        """
        注册事件处理函数,用作装饰器,例如:

        ```py
        @bot.on('notice.group_decrease', 'notice.group_increase')
        async def handler(event):
            pass
        ```

        参数为要注册的事件名,格式是点号分割的各级事件类型,见 `Event.name`。

        可以多次调用,一个函数可作为多个事件的处理函数,一个事件也可以有多个处理函数。

        可以按不同粒度注册处理函数,例如:

        ```py
        @bot.on('message')
        async def handle_message(event):
            pass

        @bot.on('message.private')
        async def handle_private_message(event):
            pass

        @bot.on('message.private.friend')
        async def handle_friend_private_message(event):
            pass
        ```

        当收到好友私聊消息时,会首先运行 ``handle_friend_private_message``,然后运行
        ``handle_private_message``,最后运行 ``handle_message``。
        """

        def deco(func: Callable) -> Callable:
            for name in event_names:
                self.subscribe(name, func)
            return func

        return deco

    on_message = _deco_maker(on, 'message')
    __pdoc__['CQHttp.on_message'] = """
    注册消息事件处理函数,用作装饰器,例如:

    ```py
    @bot.on_message('private')
    async def handler(event):
        pass
    ```

    这等价于:

    ```py
    @bot.on('message.private')
    async def handler(event):
        pass
    ```

    也可以不加参数,表示注册为所有消息事件的处理函数,例如:

    ```py
    @bot.on_message
    async def handler(event):
        pass
    ```
    """

    on_notice = _deco_maker(on, 'notice')
    __pdoc__['CQHttp.on_notice'] = "注册通知事件处理函数,用作装饰器,用法同上。"

    on_request = _deco_maker(on, 'request')
    __pdoc__['CQHttp.on_request'] = "注册请求事件处理函数,用作装饰器,用法同上。"

    on_meta_event = _deco_maker(on, 'meta_event')
    __pdoc__['CQHttp.on_meta_event'] = "注册元事件处理函数,用作装饰器,用法同上。"

    def hook_before(self, event_name: str, func: Callable) -> None:
        """注册事件处理前的钩子函数。"""
        self._bus.hook_before(event_name, ensure_async(func))

    def unhook_before(self, event_name: str, func: Callable) -> None:
        """取消注册事件处理前的钩子函数。"""
        self._bus.unhook_before(event_name, func)

    def before(self, *event_names: str) -> Callable:
        """
        注册事件处理前的钩子函数,用作装饰器,例如:

        ```py
        @bot.before('notice.group_decrease', 'notice.group_increase')
        async def hook(event):
            pass
        ```

        参数为要注册的事件名,格式是点号分割的各级事件类型,见 `Event.name`。

        钩子函数的注册方法和事件处理函数几乎完全一致,只需将 ``on`` 改为 ``before``。

        各级 before 钩子函数全部运行完成后,才会运行事件处理函数。
        """

        def deco(func: Callable) -> Callable:
            for name in event_names:
                self.hook_before(name, func)
            return func

        return deco

    before_message = _deco_maker(before, 'message')
    __pdoc__['CQHttp.before_message'] = """
    注册消息事件处理前的钩子函数,用作装饰器,例如:

    ```py
    @bot.before_message('private')
    async def hook(event):
        pass
    ```

    这等价于:

    ```py
    @bot.before('message.private')
    async def hook(event):
        pass
    ```

    也可以不加参数,表示注册为所有消息事件处理前的钩子函数,例如:

    ```py
    @bot.before_message
    async def hook(event):
        pass
    ```
    """

    before_notice = _deco_maker(before, 'notice')
    __pdoc__['CQHttp.before_notice'] = "注册通知事件处理前的钩子函数,用作装饰器,用法同上。"

    before_request = _deco_maker(before, 'request')
    __pdoc__['CQHttp.before_request'] = "注册请求事件处理前的钩子函数,用作装饰器,用法同上。"

    before_meta_event = _deco_maker(before, 'meta_event')
    __pdoc__['CQHttp.before_meta_event'] = "注册元事件处理前的钩子函数,用作装饰器,用法同上。"

    def on_startup(self, func: Callable) -> Callable:
        """
        注册 bot 启动时钩子函数,用作装饰器,例如:

        ```py
        @bot.on_startup
        async def startup():
            await db.init()
        ```
        """
        return self.server_app.before_serving(func)

    def on_websocket_connection(self, func: Callable) -> Callable:
        """
        注册 WebSocket 连接元事件处理函数,等价于 ``on_meta_event('lifecycle.connect')``,例如:

        ```py
        @bot.on_websocket_connection
        async def handler(event):
            global groups
            groups = await bot.get_group_list(self_id=event.self_id)
        ```

        注:仅支持 CQHTTP v4.14+。
        """
        return self.on_meta_event('lifecycle.connect')(func)

    async def _handle_http_event(self) -> Response:
        if self._secret:
            if 'X-Signature' not in request.headers:
                self.logger.warning('signature header is missed')
                abort(401)

            sec = self._secret
            sec = sec.encode('utf-8') if isinstance(sec, str) else sec
            sig = hmac.new(sec, await request.get_data(), 'sha1').hexdigest()
            if request.headers['X-Signature'] != 'sha1=' + sig:
                self.logger.warning('signature header is invalid')
                abort(403)

        payload = await request.json
        if not isinstance(payload, dict):
            abort(400)

        if request.headers['X-Self-ID'] in self._wsr_api_clients:
            self.logger.warning(
                'there is already a reverse websocket api connection, '
                'so the event may be handled twice.')

        response = await self._handle_event(payload)
        if isinstance(response, dict):
            return jsonify(response)
        return Response('', 204)

    async def _handle_wsr(self) -> None:
        if self._access_token:
            auth = websocket.headers.get('Authorization', '')
            m = re.fullmatch(r'(?:[Tt]oken|[Bb]earer) (?P<token>\S+)', auth)
            if not m:
                self.logger.warning('authorization header is missed')
                abort(401)

            token_given = m.group('token').strip()
            if token_given != self._access_token:
                self.logger.warning('authorization header is invalid')
                abort(403)

        role = websocket.headers['X-Client-Role'].lower()
        if role == 'event':
            await self._handle_wsr_event()
        elif role == 'api':
            await self._handle_wsr_api()
        elif role == 'universal':
            await self._handle_wsr_universal()

    async def _handle_wsr_event(self) -> None:
        try:
            while True:
                try:
                    payload = json.loads(await websocket.receive())
                except ValueError:
                    payload = None

                if not isinstance(payload, dict):
                    # ignore invalid payload
                    continue

                asyncio.create_task(self._handle_event_with_response(payload))
        finally:
            pass

    async def _handle_wsr_api(self) -> None:
        self._add_wsr_api_client()
        try:
            while True:
                try:
                    ResultStore.add(json.loads(await websocket.receive()))
                except ValueError:
                    pass
        finally:
            self._remove_wsr_api_client()

    async def _handle_wsr_universal(self) -> None:
        self._add_wsr_api_client()
        try:
            while True:
                try:
                    payload = json.loads(await websocket.receive())
                except ValueError:
                    payload = None

                if not isinstance(payload, dict):
                    # ignore invalid payload
                    continue

                if 'post_type' in payload:
                    # is a event
                    asyncio.create_task(
                        self._handle_event_with_response(payload))
                elif payload:
                    # is a api result
                    ResultStore.add(payload)
        finally:
            self._remove_wsr_api_client()

    def _add_wsr_api_client(self) -> None:
        ws = websocket._get_current_object()
        self_id = websocket.headers['X-Self-ID']
        self._wsr_api_clients[self_id] = ws

    def _remove_wsr_api_client(self) -> None:
        self_id = websocket.headers['X-Self-ID']
        if self_id in self._wsr_api_clients:
            # we must check the existence here,
            # because we allow wildcard ws connections,
            # that is, the self_id may be '*'
            del self._wsr_api_clients[self_id]

    async def _handle_event(self, payload: Dict[str, Any]) -> Any:
        ev = Event.from_payload(payload)
        if not ev:
            return

        event_name = ev.name
        self.logger.info(f'received event: {event_name}')

        if self._message_class and 'message' in ev:
            ev['message'] = self._message_class(ev['message'])
        results = list(filter(lambda r: r is not None,
                              await self._bus.emit(event_name, ev)))
        # return the first non-none result
        return results[0] if results else None

    async def _handle_event_with_response(
            self, payload: Dict[str, Any]) -> None:
        response = await self._handle_event(payload)
        if isinstance(response, dict):
            payload.pop('message', None)  # avoid wasting bandwidth
            payload.pop('raw_message', None)
            payload.pop('comment', None)
            payload.pop('sender', None)
            try:
                await self._api.call_action(
                    self_id=payload['self_id'],
                    action='.handle_quick_operation_async',
                    context=payload, operation=response
                )
            except Error:
                pass

Ancestors

Instance variables

var asgi

ASGI app 对象,可使用支持 ASGI 的 web 服务器软件部署。

Expand source code
@property
def asgi(self) -> Callable[[dict, Callable, Callable], Awaitable]:
    """ASGI app 对象,可使用支持 ASGI 的 web 服务器软件部署。"""
    return self._server_app
var server_app

Quart app 对象,可用来对 Quart 的运行做精细控制,或添加新的路由等。

Expand source code
@property
def server_app(self) -> Quart:
    """Quart app 对象,可用来对 Quart 的运行做精细控制,或添加新的路由等。"""
    return self._server_app
var logger

Quart app 的 logger,等价于 bot.server_app.logger

Expand source code
@property
def logger(self) -> logging.Logger:
    """Quart app 的 logger,等价于 ``bot.server_app.logger``。"""
    return self._server_app.logger
var loop

Quart app 所在的 event loop,在 app 运行之前为 None

Expand source code
@property
def loop(self) -> Optional[asyncio.AbstractEventLoop]:
    """Quart app 所在的 event loop,在 app 运行之前为 `None`。"""
    return self._loop
var api

AsyncApi 对象,用于异步地调用 CQHTTP API。

Expand source code
@property
def api(self) -> AsyncApi:
    """`api_impl.AsyncApi` 对象,用于异步地调用 CQHTTP API。"""
    return self._api
var sync

SyncApi 对象,用于同步地调用 CQHTTP API,例如:

@bot.on_message('group')
def sync_handler(event):
    user_info = bot.sync.get_group_member_info(
        group_id=event.group_id, user_id=event.user_id
    )
    ...
Expand source code
@property
def sync(self) -> SyncApi:
    """
    `api_impl.SyncApi` 对象,用于同步地调用 CQHTTP API,例如:

    ```py
    @bot.on_message('group')
    def sync_handler(event):
        user_info = bot.sync.get_group_member_info(
            group_id=event.group_id, user_id=event.user_id
        )
        ...
    ```
    """
    if not self._sync_api:
        if not self._loop:
            raise TimingError('attempt to access sync api '
                              'before bot is running')
        self._sync_api = SyncApi(self._api, self._loop)
    return self._sync_api

Methods

def run(self, host: str = None, port: int = None, *args, **kwargs) -> NoneType

运行 bot 对象,实际就是运行 Quart app,参数与 Quart.run 一致。

Expand source code
def run(self, host: str = None, port: int = None, *args, **kwargs) -> None:
    """运行 bot 对象,实际就是运行 Quart app,参数与 `Quart.run` 一致。"""
    if 'use_reloader' not in kwargs:
        kwargs['use_reloader'] = False
    self._server_app.run(host=host, port=port, *args, **kwargs)
def run_task(self, host: str = None, port: int = None, *args, **kwargs) -> Coroutine[NoneType, NoneType, NoneType]
Expand source code
def run_task(self, host: str = None, port: int = None,
             *args, **kwargs) -> Coroutine[None, None, None]:
    if 'use_reloader' not in kwargs:
        kwargs['use_reloader'] = False
    return self._server_app.run_task(host=host, port=port, *args, **kwargs)
async def call_action(self, action: str, **params) -> Any

通过内部维护的 AsyncApi 具体实现类调用 CQHTTP API,action 为要调用的 API 动作名,**params 为 API 所需参数。

Expand source code
async def call_action(self, action: str, **params) -> Any:
    """
    通过内部维护的 `api_impl.AsyncApi` 具体实现类调用 CQHTTP API,``action``
    为要调用的 API 动作名,``**params`` 为 API 所需参数。
    """
    return await self._api.call_action(action=action, **params)
async def send(self, event: Event, message: Union[str, Dict[str, Any], List[Dict[str, Any]], aiocqhttp.message.MessageSegment, aiocqhttp.message.Message], **kwargs) -> Union[Dict[str, Any], NoneType]

向触发事件的主体发送消息。

event 参数为事件对象,message 参数为要发送的消息。可额外传入 at_sender 命名参数用于控制是否 at 事件的触发者,默认为 False。其它命名参数作为 CQHTTP API send_msg 的参数直接传递。

Expand source code
async def send(self, event: Event, message: Message_T,
               **kwargs) -> Optional[Dict[str, Any]]:
    """
    向触发事件的主体发送消息。

    ``event`` 参数为事件对象,``message`` 参数为要发送的消息。可额外传入 ``at_sender``
    命名参数用于控制是否 at 事件的触发者,默认为 `False`。其它命名参数作为
    CQHTTP API ``send_msg`` 的参数直接传递。
    """
    msg = message if isinstance(message, Message) else Message(message)
    await run_async_funcs(self._before_sending_funcs, event, msg, kwargs)

    at_sender = kwargs.pop('at_sender', False) and ('user_id' in event)

    keys = {'message_type', 'user_id', 'group_id', 'discuss_id'}
    params = {k: v for k, v in event.items() if k in keys}
    params['message'] = msg
    params.update(kwargs)

    if 'message_type' not in params:
        if 'group_id' in params:
            params['message_type'] = 'group'
        elif 'discuss_id' in params:
            params['message_type'] = 'discuss'
        elif 'user_id' in params:
            params['message_type'] = 'private'

    if at_sender and params['message_type'] != 'private':
        params['message'] = MessageSegment.at(params['user_id']) + \
                            MessageSegment.text(' ') + params['message']

    return await self.send_msg(**params)
def before_sending(self, func: Callable) -> Callable

注册发送消息前的钩子函数,用作装饰器,例如:

@bot.before_sending
async def hook(event: Event, message: Message, kwargs: Dict[str, Any]):
    message.clear()
    message.append(MessageSegment.text('hooked!'))

该钩子函数在刚进入 CQHttp.send() 函数时运行,用户可在钩子函数中修改要发送的 message 和发送参数 kwargs

Expand source code
def before_sending(self, func: Callable) -> Callable:
    """
    注册发送消息前的钩子函数,用作装饰器,例如:

    ```py
    @bot.before_sending
    async def hook(event: Event, message: Message, kwargs: Dict[str, Any]):
        message.clear()
        message.append(MessageSegment.text('hooked!'))
    ```

    该钩子函数在刚进入 `CQHttp.send` 函数时运行,用户可在钩子函数中修改要发送的
    ``message`` 和发送参数 ``kwargs``。
    """
    self._before_sending_funcs.add(ensure_async(func))
    return func
def subscribe(self, event_name: str, func: Callable) -> NoneType

注册事件处理函数。

Expand source code
def subscribe(self, event_name: str, func: Callable) -> None:
    """注册事件处理函数。"""
    self._bus.subscribe(event_name, ensure_async(func))
def unsubscribe(self, event_name: str, func: Callable) -> NoneType

取消注册事件处理函数。

Expand source code
def unsubscribe(self, event_name: str, func: Callable) -> None:
    """取消注册事件处理函数。"""
    self._bus.unsubscribe(event_name, func)
def on(self, *event_names: str) -> Callable

注册事件处理函数,用作装饰器,例如:

@bot.on('notice.group_decrease', 'notice.group_increase')
async def handler(event):
    pass

参数为要注册的事件名,格式是点号分割的各级事件类型,见 Event.name

可以多次调用,一个函数可作为多个事件的处理函数,一个事件也可以有多个处理函数。

可以按不同粒度注册处理函数,例如:

@bot.on('message')
async def handle_message(event):
    pass

@bot.on('message.private')
async def handle_private_message(event):
    pass

@bot.on('message.private.friend')
async def handle_friend_private_message(event):
    pass

当收到好友私聊消息时,会首先运行 handle_friend_private_message,然后运行 handle_private_message,最后运行 handle_message

Expand source code
def on(self, *event_names: str) -> Callable:
    """
    注册事件处理函数,用作装饰器,例如:

    ```py
    @bot.on('notice.group_decrease', 'notice.group_increase')
    async def handler(event):
        pass
    ```

    参数为要注册的事件名,格式是点号分割的各级事件类型,见 `Event.name`。

    可以多次调用,一个函数可作为多个事件的处理函数,一个事件也可以有多个处理函数。

    可以按不同粒度注册处理函数,例如:

    ```py
    @bot.on('message')
    async def handle_message(event):
        pass

    @bot.on('message.private')
    async def handle_private_message(event):
        pass

    @bot.on('message.private.friend')
    async def handle_friend_private_message(event):
        pass
    ```

    当收到好友私聊消息时,会首先运行 ``handle_friend_private_message``,然后运行
    ``handle_private_message``,最后运行 ``handle_message``。
    """

    def deco(func: Callable) -> Callable:
        for name in event_names:
            self.subscribe(name, func)
        return func

    return deco
def on_message(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) -> Callable

注册消息事件处理函数,用作装饰器,例如:

@bot.on_message('private')
async def handler(event):
    pass

这等价于:

@bot.on('message.private')
async def handler(event):
    pass

也可以不加参数,表示注册为所有消息事件的处理函数,例如:

@bot.on_message
async def handler(event):
    pass
Expand source code
def deco_deco(self, arg: Optional[Union[str, Callable]] = None,
              *sub_event_names: str) -> Callable:
    def deco(func: Callable) -> Callable:
        if isinstance(arg, str):
            e = [type_ + '.' + e for e in [arg] + list(sub_event_names)]
            # self.on(*e)(func)
            deco_method(self, *e)(func)
        else:
            # self.on(type_)(func)
            deco_method(self, type_)(func)
        return func

    if isinstance(arg, Callable):
        return deco(arg)
    return deco
def on_notice(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) -> Callable

注册通知事件处理函数,用作装饰器,用法同上。

Expand source code
def deco_deco(self, arg: Optional[Union[str, Callable]] = None,
              *sub_event_names: str) -> Callable:
    def deco(func: Callable) -> Callable:
        if isinstance(arg, str):
            e = [type_ + '.' + e for e in [arg] + list(sub_event_names)]
            # self.on(*e)(func)
            deco_method(self, *e)(func)
        else:
            # self.on(type_)(func)
            deco_method(self, type_)(func)
        return func

    if isinstance(arg, Callable):
        return deco(arg)
    return deco
def on_request(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) -> Callable

注册请求事件处理函数,用作装饰器,用法同上。

Expand source code
def deco_deco(self, arg: Optional[Union[str, Callable]] = None,
              *sub_event_names: str) -> Callable:
    def deco(func: Callable) -> Callable:
        if isinstance(arg, str):
            e = [type_ + '.' + e for e in [arg] + list(sub_event_names)]
            # self.on(*e)(func)
            deco_method(self, *e)(func)
        else:
            # self.on(type_)(func)
            deco_method(self, type_)(func)
        return func

    if isinstance(arg, Callable):
        return deco(arg)
    return deco
def on_meta_event(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) -> Callable

注册元事件处理函数,用作装饰器,用法同上。

Expand source code
def deco_deco(self, arg: Optional[Union[str, Callable]] = None,
              *sub_event_names: str) -> Callable:
    def deco(func: Callable) -> Callable:
        if isinstance(arg, str):
            e = [type_ + '.' + e for e in [arg] + list(sub_event_names)]
            # self.on(*e)(func)
            deco_method(self, *e)(func)
        else:
            # self.on(type_)(func)
            deco_method(self, type_)(func)
        return func

    if isinstance(arg, Callable):
        return deco(arg)
    return deco
def hook_before(self, event_name: str, func: Callable) -> NoneType

注册事件处理前的钩子函数。

Expand source code
def hook_before(self, event_name: str, func: Callable) -> None:
    """注册事件处理前的钩子函数。"""
    self._bus.hook_before(event_name, ensure_async(func))
def unhook_before(self, event_name: str, func: Callable) -> NoneType

取消注册事件处理前的钩子函数。

Expand source code
def unhook_before(self, event_name: str, func: Callable) -> None:
    """取消注册事件处理前的钩子函数。"""
    self._bus.unhook_before(event_name, func)
def before(self, *event_names: str) -> Callable

注册事件处理前的钩子函数,用作装饰器,例如:

@bot.before('notice.group_decrease', 'notice.group_increase')
async def hook(event):
    pass

参数为要注册的事件名,格式是点号分割的各级事件类型,见 Event.name

钩子函数的注册方法和事件处理函数几乎完全一致,只需将 on 改为 before

各级 before 钩子函数全部运行完成后,才会运行事件处理函数。

Expand source code
def before(self, *event_names: str) -> Callable:
    """
    注册事件处理前的钩子函数,用作装饰器,例如:

    ```py
    @bot.before('notice.group_decrease', 'notice.group_increase')
    async def hook(event):
        pass
    ```

    参数为要注册的事件名,格式是点号分割的各级事件类型,见 `Event.name`。

    钩子函数的注册方法和事件处理函数几乎完全一致,只需将 ``on`` 改为 ``before``。

    各级 before 钩子函数全部运行完成后,才会运行事件处理函数。
    """

    def deco(func: Callable) -> Callable:
        for name in event_names:
            self.hook_before(name, func)
        return func

    return deco
def before_message(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) -> Callable

注册消息事件处理前的钩子函数,用作装饰器,例如:

@bot.before_message('private')
async def hook(event):
    pass

这等价于:

@bot.before('message.private')
async def hook(event):
    pass

也可以不加参数,表示注册为所有消息事件处理前的钩子函数,例如:

@bot.before_message
async def hook(event):
    pass
Expand source code
def deco_deco(self, arg: Optional[Union[str, Callable]] = None,
              *sub_event_names: str) -> Callable:
    def deco(func: Callable) -> Callable:
        if isinstance(arg, str):
            e = [type_ + '.' + e for e in [arg] + list(sub_event_names)]
            # self.on(*e)(func)
            deco_method(self, *e)(func)
        else:
            # self.on(type_)(func)
            deco_method(self, type_)(func)
        return func

    if isinstance(arg, Callable):
        return deco(arg)
    return deco
def before_notice(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) -> Callable

注册通知事件处理前的钩子函数,用作装饰器,用法同上。

Expand source code
def deco_deco(self, arg: Optional[Union[str, Callable]] = None,
              *sub_event_names: str) -> Callable:
    def deco(func: Callable) -> Callable:
        if isinstance(arg, str):
            e = [type_ + '.' + e for e in [arg] + list(sub_event_names)]
            # self.on(*e)(func)
            deco_method(self, *e)(func)
        else:
            # self.on(type_)(func)
            deco_method(self, type_)(func)
        return func

    if isinstance(arg, Callable):
        return deco(arg)
    return deco
def before_request(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) -> Callable

注册请求事件处理前的钩子函数,用作装饰器,用法同上。

Expand source code
def deco_deco(self, arg: Optional[Union[str, Callable]] = None,
              *sub_event_names: str) -> Callable:
    def deco(func: Callable) -> Callable:
        if isinstance(arg, str):
            e = [type_ + '.' + e for e in [arg] + list(sub_event_names)]
            # self.on(*e)(func)
            deco_method(self, *e)(func)
        else:
            # self.on(type_)(func)
            deco_method(self, type_)(func)
        return func

    if isinstance(arg, Callable):
        return deco(arg)
    return deco
def before_meta_event(self, arg: Union[str, Callable, NoneType] = None, *sub_event_names: str) -> Callable

注册元事件处理前的钩子函数,用作装饰器,用法同上。

Expand source code
def deco_deco(self, arg: Optional[Union[str, Callable]] = None,
              *sub_event_names: str) -> Callable:
    def deco(func: Callable) -> Callable:
        if isinstance(arg, str):
            e = [type_ + '.' + e for e in [arg] + list(sub_event_names)]
            # self.on(*e)(func)
            deco_method(self, *e)(func)
        else:
            # self.on(type_)(func)
            deco_method(self, type_)(func)
        return func

    if isinstance(arg, Callable):
        return deco(arg)
    return deco
def on_startup(self, func: Callable) -> Callable

注册 bot 启动时钩子函数,用作装饰器,例如:

@bot.on_startup
async def startup():
    await db.init()
Expand source code
def on_startup(self, func: Callable) -> Callable:
    """
    注册 bot 启动时钩子函数,用作装饰器,例如:

    ```py
    @bot.on_startup
    async def startup():
        await db.init()
    ```
    """
    return self.server_app.before_serving(func)
def on_websocket_connection(self, func: Callable) -> Callable

注册 WebSocket 连接元事件处理函数,等价于 on_meta_event('lifecycle.connect'),例如:

@bot.on_websocket_connection
async def handler(event):
    global groups
    groups = await bot.get_group_list(self_id=event.self_id)

注:仅支持 CQHTTP v4.14+。

Expand source code
def on_websocket_connection(self, func: Callable) -> Callable:
    """
    注册 WebSocket 连接元事件处理函数,等价于 ``on_meta_event('lifecycle.connect')``,例如:

    ```py
    @bot.on_websocket_connection
    async def handler(event):
        global groups
        groups = await bot.get_group_list(self_id=event.self_id)
    ```

    注:仅支持 CQHTTP v4.14+。
    """
    return self.on_meta_event('lifecycle.connect')(func)
class Event (...)

封装从 CQHTTP 收到的事件数据对象(字典),提供属性以获取其中的字段。

typedetail_type 属性对于任何事件都有效外,其它属性存在与否(不存在则返回 None)依事件不同而不同。

Expand source code
class Event(dict):
    """
    封装从 CQHTTP 收到的事件数据对象(字典),提供属性以获取其中的字段。

    除 `type` 和 `detail_type` 属性对于任何事件都有效外,其它属性存在与否(不存在则返回
    `None`)依事件不同而不同。
    """

    @staticmethod
    def from_payload(payload: Dict[str, Any]) -> 'Optional[Event]':
        """
        从 CQHTTP 事件数据构造 `Event` 对象。
        """
        try:
            e = Event(payload)
            _ = e.type, e.detail_type
            return e
        except KeyError:
            return None

    @property
    def type(self) -> str:
        """
        事件类型,有 ``message``、``notice``、``request``、``meta_event`` 等。
        """
        return self['post_type']

    @property
    def detail_type(self) -> str:
        """
        事件具体类型,依 `type` 的不同而不同,以 ``message`` 类型为例,有
        ``private``、``group``、``discuss`` 等。
        """
        return self[f'{self.type}_type']

    @property
    def sub_type(self) -> Optional[str]:
        """
        事件子类型,依 `detail_type` 不同而不同,以 ``message.private`` 为例,有
        ``friend``、``group``、``discuss``、``other`` 等。
        """
        return self.get('sub_type')

    @property
    def name(self):
        """
        事件名,对于有 `sub_type` 的事件,为 ``{type}.{detail_type}.{sub_type}``,否则为
        ``{type}.{detail_type}``。
        """
        n = self.type + '.' + self.detail_type
        if self.sub_type:
            n += '.' + self.sub_type
        return n

    self_id: int  # 机器人自身 ID
    user_id: Optional[int]  # 用户 ID
    operator_id: Optional[int]  # 操作者 ID
    group_id: Optional[int]  # 群 ID
    discuss_id: Optional[int]  # 讨论组 ID
    message_id: Optional[int]  # 消息 ID
    message: Optional[Any]  # 消息
    raw_message: Optional[str]  # 未经 CQHTTP 处理的原始消息
    sender: Optional[Dict[str, Any]]  # 消息发送者信息
    anonymous: Optional[Dict[str, Any]]  # 匿名信息
    file: Optional[Dict[str, Any]]  # 文件信息
    comment: Optional[str]  # 请求验证消息
    flag: Optional[str]  # 请求标识

    def __getattr__(self, key) -> Optional[Any]:
        return self.get(key)

    def __setattr__(self, key, value) -> None:
        self[key] = value

    def __repr__(self) -> str:
        return f'<Event, {super().__repr__()}>'

Ancestors

  • builtins.dict

Static methods

def from_payload(payload: Dict[str, Any]) -> Union[Event, NoneType]

从 CQHTTP 事件数据构造 Event 对象。

Expand source code
@staticmethod
def from_payload(payload: Dict[str, Any]) -> 'Optional[Event]':
    """
    从 CQHTTP 事件数据构造 `Event` 对象。
    """
    try:
        e = Event(payload)
        _ = e.type, e.detail_type
        return e
    except KeyError:
        return None

Instance variables

var type

事件类型,有 messagenoticerequestmeta_event 等。

Expand source code
@property
def type(self) -> str:
    """
    事件类型,有 ``message``、``notice``、``request``、``meta_event`` 等。
    """
    return self['post_type']
var detail_type

事件具体类型,依 type 的不同而不同,以 message 类型为例,有 privategroupdiscuss 等。

Expand source code
@property
def detail_type(self) -> str:
    """
    事件具体类型,依 `type` 的不同而不同,以 ``message`` 类型为例,有
    ``private``、``group``、``discuss`` 等。
    """
    return self[f'{self.type}_type']
var sub_type

事件子类型,依 detail_type 不同而不同,以 message.private 为例,有 friendgroupdiscussother 等。

Expand source code
@property
def sub_type(self) -> Optional[str]:
    """
    事件子类型,依 `detail_type` 不同而不同,以 ``message.private`` 为例,有
    ``friend``、``group``、``discuss``、``other`` 等。
    """
    return self.get('sub_type')
var name

事件名,对于有 sub_type 的事件,为 {type}.{detail_type}.{sub_type},否则为 {type}.{detail_type}

Expand source code
@property
def name(self):
    """
    事件名,对于有 `sub_type` 的事件,为 ``{type}.{detail_type}.{sub_type}``,否则为
    ``{type}.{detail_type}``。
    """
    n = self.type + '.' + self.detail_type
    if self.sub_type:
        n += '.' + self.sub_type
    return n
class Message (msg: Any = None, *args, **kwargs)

消息,即消息段列表。

msg 参数为要转换为 Message 对象的字符串、列表或字典。

Expand source code
class Message(list):
    """
    消息,即消息段列表。
    """

    def __init__(self, msg: Any = None, *args, **kwargs):
        """``msg`` 参数为要转换为 `Message` 对象的字符串、列表或字典。"""
        super().__init__(*args, **kwargs)
        try:
            if isinstance(msg, (list, str)):
                self.extend(msg)
            elif isinstance(msg, dict):
                self.append(msg)
        except ValueError:
            raise ValueError('the msg argument is not recognizable')

    @staticmethod
    def _split_iter(msg_str: str) -> Iterable[MessageSegment]:
        def iter_function_name_and_extra() -> Iterable[Tuple[str, str]]:
            text_begin = 0
            for cqcode in re.finditer(r'\[CQ:(?P<type>[a-zA-Z0-9-_.]+)'
                                      r'(?P<params>'
                                      r'(?:,[a-zA-Z0-9-_.]+=?[^,\]]*)*'
                                      r'),?\]',
                                      msg_str):
                yield 'text', unescape(
                    msg_str[text_begin:cqcode.pos + cqcode.start()])
                text_begin = cqcode.pos + cqcode.end()
                yield cqcode.group('type'), cqcode.group('params').lstrip(',')
            yield 'text', unescape(msg_str[text_begin:])

        for function_name, extra in iter_function_name_and_extra():
            if function_name == 'text':
                if extra:
                    # only yield non-empty text segment
                    yield MessageSegment(type_=function_name,
                                         data={'text': extra})
            else:
                data = {k: v for k, v in map(
                    lambda x: x.split('=', maxsplit=1),
                    filter(lambda x: x, (x.lstrip() for x in extra.split(',')))
                )}
                yield MessageSegment(type_=function_name, data=data)

    def __str__(self):
        return ''.join((str(seg) for seg in self))

    def __add__(self, other: Any):
        result = Message(self)
        try:
            if isinstance(other, Message):
                result.extend(other)
            elif isinstance(other, MessageSegment):
                result.append(other)
            elif isinstance(other, list):
                result.extend(map(lambda d: MessageSegment(d), other))
            elif isinstance(other, dict):
                result.append(MessageSegment(other))
            elif isinstance(other, str):
                result.extend(Message._split_iter(other))
            return result
        except ValueError:
            raise ValueError('the addend is not a valid message')

    def __radd__(self, other: Any):
        try:
            result = Message(other)
            return result.__add__(self)
        except ValueError:
            raise ValueError('the left addend is not a valid message')

    def append(self, obj: Any) -> Any:
        """在消息末尾追加消息段。"""
        try:
            if isinstance(obj, MessageSegment):
                if self and self[-1].type == 'text' and obj.type == 'text':
                    self[-1].data['text'] += obj.data['text']
                elif obj.type != 'text' or obj.data['text'] or not self:
                    super().append(obj)
            else:
                self.append(MessageSegment(obj))
            return self
        except ValueError:
            raise ValueError('the object is not a valid message segment')

    def extend(self, msg: Any) -> Any:
        """在消息末尾追加消息(字符串或消息段列表)。"""
        try:
            if isinstance(msg, str):
                msg = self._split_iter(msg)

            for seg in msg:
                self.append(seg)
            return self
        except ValueError:
            raise ValueError('the object is not a valid message')

    def reduce(self) -> None:
        """
        化简消息,即去除多余消息段、合并相邻纯文本消息段。

        由于 `Message` 类基于 `list`,此方法时间复杂度为 O(n)。
        """
        idx = 0
        while idx < len(self):
            if idx > 0 and \
                    self[idx - 1].type == 'text' and self[idx].type == 'text':
                self[idx - 1].data['text'] += self[idx].data['text']
                del self[idx]
            else:
                idx += 1

    def extract_plain_text(self, reduce: bool = False) -> str:
        """
        提取消息中的所有纯文本消息段,合并,中间用空格分隔。

        ``reduce`` 参数控制是否在提取之前化简消息。
        """
        if reduce:
            self.reduce()

        result = ''
        for seg in self:
            if seg.type == 'text':
                result += ' ' + seg.data['text']
        if result:
            result = result[1:]
        return result

Ancestors

  • builtins.list

Methods

def append(self, obj: Any) -> Any

在消息末尾追加消息段。

Expand source code
def append(self, obj: Any) -> Any:
    """在消息末尾追加消息段。"""
    try:
        if isinstance(obj, MessageSegment):
            if self and self[-1].type == 'text' and obj.type == 'text':
                self[-1].data['text'] += obj.data['text']
            elif obj.type != 'text' or obj.data['text'] or not self:
                super().append(obj)
        else:
            self.append(MessageSegment(obj))
        return self
    except ValueError:
        raise ValueError('the object is not a valid message segment')
def extend(self, msg: Any) -> Any

在消息末尾追加消息(字符串或消息段列表)。

Expand source code
def extend(self, msg: Any) -> Any:
    """在消息末尾追加消息(字符串或消息段列表)。"""
    try:
        if isinstance(msg, str):
            msg = self._split_iter(msg)

        for seg in msg:
            self.append(seg)
        return self
    except ValueError:
        raise ValueError('the object is not a valid message')
def reduce(self) -> NoneType

化简消息,即去除多余消息段、合并相邻纯文本消息段。

由于 Message 类基于 list,此方法时间复杂度为 O(n)。

Expand source code
def reduce(self) -> None:
    """
    化简消息,即去除多余消息段、合并相邻纯文本消息段。

    由于 `Message` 类基于 `list`,此方法时间复杂度为 O(n)。
    """
    idx = 0
    while idx < len(self):
        if idx > 0 and \
                self[idx - 1].type == 'text' and self[idx].type == 'text':
            self[idx - 1].data['text'] += self[idx].data['text']
            del self[idx]
        else:
            idx += 1
def extract_plain_text(self, reduce: bool = False) -> str

提取消息中的所有纯文本消息段,合并,中间用空格分隔。

reduce 参数控制是否在提取之前化简消息。

Expand source code
def extract_plain_text(self, reduce: bool = False) -> str:
    """
    提取消息中的所有纯文本消息段,合并,中间用空格分隔。

    ``reduce`` 参数控制是否在提取之前化简消息。
    """
    if reduce:
        self.reduce()

    result = ''
    for seg in self:
        if seg.type == 'text':
            result += ' ' + seg.data['text']
    if result:
        result = result[1:]
    return result
class MessageSegment (d: Union[Dict[str, Any], NoneType] = None, *, type_: Union[str, NoneType] = None, data: Union[Dict[str, str], NoneType] = None)

消息段,即表示成字典的 CQ 码。

不建议手动构造消息段;建议使用此类的静态方法构造,例如:

at_seg = MessageSegment.at(10001000)

可进行判等和加法操作,例如:

assert at_seg == MessageSegment.at(10001000)
msg: Message = at_seg + MessageSegment.face(14)
Expand source code
class MessageSegment(dict):
    """
    消息段,即表示成字典的 CQ 码。

    不建议手动构造消息段;建议使用此类的静态方法构造,例如:

    ```py
    at_seg = MessageSegment.at(10001000)
    ```

    可进行判等和加法操作,例如:

    ```py
    assert at_seg == MessageSegment.at(10001000)
    msg: Message = at_seg + MessageSegment.face(14)
    ```
    """

    def __init__(self, d: Optional[Dict[str, Any]] = None, *,
                 type_: Optional[str] = None,
                 data: Optional[Dict[str, str]] = None):
        super().__init__()
        if isinstance(d, dict) and d.get('type'):
            self.update(d)
        elif type_:
            self.type = type_
            self.data = data
        else:
            raise ValueError('the "type" field cannot be None or empty')

    def __getitem__(self, item):
        if item not in ('type', 'data'):
            raise KeyError(f'the key "{item}" is not allowed')
        return super().__getitem__(item)

    def __setitem__(self, key, value):
        if key not in ('type', 'data'):
            raise KeyError(f'the key "{key}" is not allowed')
        return super().__setitem__(key, value)

    def __delitem__(self, key):
        raise NotImplementedError

    @property
    def type(self) -> str:
        """
        消息段类型,即 CQ 码功能名。

        纯文本消息段的类型名为 ``text``。
        """
        return self['type']

    @type.setter
    def type(self, type_: str):
        self['type'] = type_

    @property
    def data(self) -> Dict[str, str]:
        """
        消息段数据,即 CQ 码参数。

        该字典内所有值都是未经 CQ 码转义的字符串。
        """
        return self['data']

    @data.setter
    def data(self, data: Optional[Dict[str, str]]):
        self['data'] = data or {}

    def __str__(self):
        if self.type == 'text':
            return escape(self.data.get('text', ''), escape_comma=False)

        params = ','.join(('{}={}'.format(k, escape(str(v)))
                           for k, v in self.data.items()))
        if params:
            params = ',' + params
        return '[CQ:{type}{params}]'.format(type=self.type, params=params)

    def __eq__(self, other):
        if not isinstance(other, MessageSegment):
            return False
        return self.type == other.type and self.data == other.data

    def __add__(self, other: Any):
        return Message(self).__add__(other)

    def __radd__(self, other: Any):
        return Message(self).__radd__(other)

    @staticmethod
    def text(text: str) -> 'MessageSegment':
        """纯文本。"""
        return MessageSegment(type_='text', data={'text': text})

    @staticmethod
    def emoji(id_: int) -> 'MessageSegment':
        """Emoji 表情。"""
        return MessageSegment(type_='emoji', data={'id': str(id_)})

    @staticmethod
    def face(id_: int) -> 'MessageSegment':
        """QQ 表情。"""
        return MessageSegment(type_='face', data={'id': str(id_)})

    @staticmethod
    def image(file: str, destruct: bool = False) -> 'MessageSegment':
        """图片。"""
        if destruct:
            return MessageSegment(type_='image', data={
                'file': file,
                'destruct': '1'
            })
        return MessageSegment(type_='image', data={'file': file})

    @staticmethod
    def record(file: str, magic: bool = False) -> 'MessageSegment':
        """语音。"""
        return MessageSegment(type_='record', data={
            'file': file,
            'magic': _b2s(magic)
        })

    @staticmethod
    def at(user_id: int) -> 'MessageSegment':
        """@某人。"""
        return MessageSegment(type_='at', data={'qq': str(user_id)})

    @staticmethod
    def rps() -> 'MessageSegment':
        """猜拳魔法表情。"""
        return MessageSegment(type_='rps')

    @staticmethod
    def dice() -> 'MessageSegment':
        """掷骰子魔法表情。"""
        return MessageSegment(type_='dice')

    @staticmethod
    def shake() -> 'MessageSegment':
        """戳一戳。"""
        return MessageSegment(type_='shake')

    @staticmethod
    def anonymous(ignore_failure: bool = False) -> 'MessageSegment':
        """匿名发消息。"""
        return MessageSegment(type_='anonymous',
                              data={'ignore': _b2s(ignore_failure)})

    @staticmethod
    def share(url: str, title: str, content: str = '',
              image_url: str = '') -> 'MessageSegment':
        """链接分享。"""
        return MessageSegment(type_='share', data={
            'url': url,
            'title': title,
            'content': content,
            'image': image_url
        })

    @staticmethod
    def contact_user(id_: int) -> 'MessageSegment':
        """推荐好友。"""
        return MessageSegment(type_='contact', data={
            'type': 'qq',
            'id': str(id_)
        })

    @staticmethod
    def contact_group(id_: int) -> 'MessageSegment':
        """推荐群。"""
        return MessageSegment(type_='contact', data={
            'type': 'group',
            'id': str(id_)
        })

    @staticmethod
    def location(latitude: float, longitude: float, title: str = '',
                 content: str = '') -> 'MessageSegment':
        """位置。"""
        return MessageSegment(type_='location', data={
            'lat': str(latitude),
            'lon': str(longitude),
            'title': title,
            'content': content
        })

    @staticmethod
    def music(type_: str, id_: int,
              style: Optional[int] = None) -> 'MessageSegment':
        """音乐"""
        if style is not None:
            return MessageSegment(type_='music', data={
                'type': type_,
                'id': str(id_),
                'style': str(style)
            })
        return MessageSegment(type_='music', data={
            'type': type_,
            'id': str(id_)
        })

    @staticmethod
    def music_custom(url: str, audio_url: str, title: str, content: str = '',
                     image_url: str = '') -> 'MessageSegment':
        """音乐自定义分享。"""
        return MessageSegment(type_='music', data={
            'type': 'custom',
            'url': url,
            'audio': audio_url,
            'title': title,
            'content': content,
            'image': image_url
        })

Ancestors

  • builtins.dict

Static methods

def text(text: str) -> MessageSegment

纯文本。

Expand source code
@staticmethod
def text(text: str) -> 'MessageSegment':
    """纯文本。"""
    return MessageSegment(type_='text', data={'text': text})
def emoji(id_: int) -> MessageSegment

Emoji 表情。

Expand source code
@staticmethod
def emoji(id_: int) -> 'MessageSegment':
    """Emoji 表情。"""
    return MessageSegment(type_='emoji', data={'id': str(id_)})
def face(id_: int) -> MessageSegment

QQ 表情。

Expand source code
@staticmethod
def face(id_: int) -> 'MessageSegment':
    """QQ 表情。"""
    return MessageSegment(type_='face', data={'id': str(id_)})
def image(file: str, destruct: bool = False) -> MessageSegment

图片。

Expand source code
@staticmethod
def image(file: str, destruct: bool = False) -> 'MessageSegment':
    """图片。"""
    if destruct:
        return MessageSegment(type_='image', data={
            'file': file,
            'destruct': '1'
        })
    return MessageSegment(type_='image', data={'file': file})
def record(file: str, magic: bool = False) -> MessageSegment

语音。

Expand source code
@staticmethod
def record(file: str, magic: bool = False) -> 'MessageSegment':
    """语音。"""
    return MessageSegment(type_='record', data={
        'file': file,
        'magic': _b2s(magic)
    })
def at(user_id: int) -> MessageSegment

@某人。

Expand source code
@staticmethod
def at(user_id: int) -> 'MessageSegment':
    """@某人。"""
    return MessageSegment(type_='at', data={'qq': str(user_id)})
def rps() -> MessageSegment

猜拳魔法表情。

Expand source code
@staticmethod
def rps() -> 'MessageSegment':
    """猜拳魔法表情。"""
    return MessageSegment(type_='rps')
def dice() -> MessageSegment

掷骰子魔法表情。

Expand source code
@staticmethod
def dice() -> 'MessageSegment':
    """掷骰子魔法表情。"""
    return MessageSegment(type_='dice')
def shake() -> MessageSegment

戳一戳。

Expand source code
@staticmethod
def shake() -> 'MessageSegment':
    """戳一戳。"""
    return MessageSegment(type_='shake')
def anonymous(ignore_failure: bool = False) -> MessageSegment

匿名发消息。

Expand source code
@staticmethod
def anonymous(ignore_failure: bool = False) -> 'MessageSegment':
    """匿名发消息。"""
    return MessageSegment(type_='anonymous',
                          data={'ignore': _b2s(ignore_failure)})
def share(url: str, title: str, content: str = '', image_url: str = '') -> MessageSegment

链接分享。

Expand source code
@staticmethod
def share(url: str, title: str, content: str = '',
          image_url: str = '') -> 'MessageSegment':
    """链接分享。"""
    return MessageSegment(type_='share', data={
        'url': url,
        'title': title,
        'content': content,
        'image': image_url
    })
def contact_user(id_: int) -> MessageSegment

推荐好友。

Expand source code
@staticmethod
def contact_user(id_: int) -> 'MessageSegment':
    """推荐好友。"""
    return MessageSegment(type_='contact', data={
        'type': 'qq',
        'id': str(id_)
    })
def contact_group(id_: int) -> MessageSegment

推荐群。

Expand source code
@staticmethod
def contact_group(id_: int) -> 'MessageSegment':
    """推荐群。"""
    return MessageSegment(type_='contact', data={
        'type': 'group',
        'id': str(id_)
    })
def location(latitude: float, longitude: float, title: str = '', content: str = '') -> MessageSegment

位置。

Expand source code
@staticmethod
def location(latitude: float, longitude: float, title: str = '',
             content: str = '') -> 'MessageSegment':
    """位置。"""
    return MessageSegment(type_='location', data={
        'lat': str(latitude),
        'lon': str(longitude),
        'title': title,
        'content': content
    })
def music(type_: str, id_: int, style: Union[int, NoneType] = None) -> MessageSegment

音乐

Expand source code
@staticmethod
def music(type_: str, id_: int,
          style: Optional[int] = None) -> 'MessageSegment':
    """音乐"""
    if style is not None:
        return MessageSegment(type_='music', data={
            'type': type_,
            'id': str(id_),
            'style': str(style)
        })
    return MessageSegment(type_='music', data={
        'type': type_,
        'id': str(id_)
    })
def music_custom(url: str, audio_url: str, title: str, content: str = '', image_url: str = '') -> MessageSegment

音乐自定义分享。

Expand source code
@staticmethod
def music_custom(url: str, audio_url: str, title: str, content: str = '',
                 image_url: str = '') -> 'MessageSegment':
    """音乐自定义分享。"""
    return MessageSegment(type_='music', data={
        'type': 'custom',
        'url': url,
        'audio': audio_url,
        'title': title,
        'content': content,
        'image': image_url
    })

Instance variables

var type

消息段类型,即 CQ 码功能名。

纯文本消息段的类型名为 text

Expand source code
@property
def type(self) -> str:
    """
    消息段类型,即 CQ 码功能名。

    纯文本消息段的类型名为 ``text``。
    """
    return self['type']
var data

消息段数据,即 CQ 码参数。

该字典内所有值都是未经 CQ 码转义的字符串。

Expand source code
@property
def data(self) -> Dict[str, str]:
    """
    消息段数据,即 CQ 码参数。

    该字典内所有值都是未经 CQ 码转义的字符串。
    """
    return self['data']
class Error (...)

aiocqhttp 所有异常的基类。

Expand source code
class Error(Exception):
    """`aiocqhttp` 所有异常的基类。"""
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

class ApiNotAvailable (...)

CQHTTP API 不可用。

Expand source code
class ApiNotAvailable(Error):
    """CQHTTP API 不可用。"""
    pass

Ancestors

  • Error
  • builtins.Exception
  • builtins.BaseException
class ApiError (...)

调用 CQHTTP API 发生错误。

Expand source code
class ApiError(Error, RuntimeError):
    """调用 CQHTTP API 发生错误。"""
    pass

Ancestors

  • Error
  • builtins.RuntimeError
  • builtins.Exception
  • builtins.BaseException

Subclasses

class HttpFailed (status_code: int)

HTTP 请求响应码不是 2xx。

Expand source code
class HttpFailed(ApiError):
    """HTTP 请求响应码不是 2xx。"""

    def __init__(self, status_code: int):
        self.status_code = status_code
        """HTTP 响应码。"""

    def __repr__(self):
        return f'<HttpFailed, status_code={self.status_code}>'

    def __str__(self):
        return self.__repr__()

Ancestors

  • ApiError
  • Error
  • builtins.RuntimeError
  • builtins.Exception
  • builtins.BaseException

Instance variables

var status_code

HTTP 响应码。

class ActionFailed (retcode: int)

CQHTTP 已收到 API 请求,但执行失败。

except ActionFailed as e:
    if e.retcode > 0:
        pass  # error code returned by CQHTTP
    elif e.retcode < 0:
        pass  # error code returned by CoolQ
Expand source code
class ActionFailed(ApiError):
    """
    CQHTTP 已收到 API 请求,但执行失败。

    ```py
    except ActionFailed as e:
        if e.retcode > 0:
            pass  # error code returned by CQHTTP
        elif e.retcode < 0:
            pass  # error code returned by CoolQ
    ```
    """

    def __init__(self, retcode: int):
        self.retcode = retcode
        """返回码,若大于 0 则是由 CQHTTP 返回,若小于 0 则是由 酷Q 返回。"""

    def __repr__(self):
        return f'<ActionFailed, retcode={self.retcode}>'

    def __str__(self):
        return self.__repr__()

Ancestors

  • ApiError
  • Error
  • builtins.RuntimeError
  • builtins.Exception
  • builtins.BaseException

Instance variables

var retcode

返回码,若大于 0 则是由 CQHTTP 返回,若小于 0 则是由 酷Q 返回。

class NetworkError (...)

网络错误。

Expand source code
class NetworkError(Error, IOError):
    """网络错误。"""
    pass

Ancestors

  • Error
  • builtins.OSError
  • builtins.Exception
  • builtins.BaseException
class TimingError (...)

时机错误。

Expand source code
class TimingError(Error):
    """时机错误。"""
    pass

Ancestors

  • Error
  • builtins.Exception
  • builtins.BaseException