Source code for uv.handles.stream

# -*- coding: utf-8 -*-

# Copyright (C) 2016, Maximilian Köhl <mail@koehlma.de>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License version 3 as published by
# the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.

from __future__ import absolute_import, division, print_function, unicode_literals

from .. import abstract, base, common, error, handle, library, request
from ..library import ffi, lib


@base.request_callback('uv_shutdown_cb')
def uv_shutdown_cb(shutdown_request, status):
    """
    :type shutdown_request:
        uv.ShutdownRequest
    :type status:
        int
    """
    shutdown_request.on_shutdown(shutdown_request, error.StatusCodes.get(status))


@request.RequestType.SHUTDOWN
[docs]class ShutdownRequest(request.UVRequest): """ Request to shutdown the outgoing side of a duplex stream. It waits for pending write requests to complete. :raises uv.UVError: error while initializing the request :raises uv.ClosedHandleError: stream has already been closed or is closing :param stream: stream to shutdown :param on_shutdown: callback which should run after shutdown has been completed :type stream: uv.UVStream :type on_shutdown: ((uv.ShutdownRequest, uv.StatusCodes) -> None) | ((Any, uv.ShutdownRequest, uv.StatusCodes) -> None) """ __slots__ = ['uv_shutdown', 'stream', 'on_shutdown'] uv_request_type = 'uv_shutdown_t*' uv_request_init = lib.uv_shutdown def __init__(self, stream, on_shutdown=None): if stream.closing: raise error.ClosedHandleError() self.stream = stream """ Stream to shutdown. :readonly: True :type: uv.UVStream """ self.on_shutdown = on_shutdown or common.dummy_callback """ Callback which should run after shutdown has been completed. .. function:: on_shutdown(shutdown_request, status) :param shutdown_request: request the call originates from :param status: status of the shutdown request :type shutdown_request: uv.ShutdownRequest :type status: uv.StatusCodes :readonly: False :type: ((uv.ShutdownRequest, uv.StatusCodes) -> None) | ((Any, uv.ShutdownRequest, uv.StatusCodes) -> None) """ arguments = uv_shutdown_cb, super(ShutdownRequest, self).__init__(stream.loop, arguments, stream.uv_stream)
@base.request_callback('uv_write_cb') def uv_write_cb(write_request, status): """ :type write_request: uv.WriteRequest :type status: int """ write_request.on_write(write_request, error.StatusCodes.get(status)) @request.RequestType.WRITE
[docs]class WriteRequest(request.UVRequest): """ Request to write data to a stream and, on streams with inter process communication support, to send stream handles. Buffers are written in the given order. :raises uv.UVError: error while initializing the request :raises uv.ClosedHandleError: stream has already been closed or is closing :param stream: stream to write data to :param buffers: data which should be written :param send_stream: stream handle which should be send :param on_write: callback which should run after all data has been written :type stream: uv.UVStream :type buffers: tuple[bytes] | list[bytes] | bytes :type send_stream: uv.TCP | uv.Pipe | None :type on_write: ((uv.WriteRequest, uv.StatusCodes) -> None) | ((Any, uv.WriteRequest, uv.StatusCodes) -> None) """ __slots__ = ['uv_buffers', 'stream', 'send_stream', 'on_write'] uv_request_type = 'uv_write_t*' def __init__(self, stream, buffers, send_stream=None, on_write=None): if stream.closing: raise error.ClosedHandleError() self.uv_buffers = library.make_uv_buffers(buffers) self.stream = stream """ Stream to write data to. :readonly: True :type: uv.UVStream """ self.send_stream = send_stream """ Stream handle which should be send. :readonly: True :type: uv.UVStream | None """ self.on_write = on_write or common.dummy_callback """ Callback which should run after all data has been written. .. function: on_write(write_request, status) :param write_request: request the call originates from :param status: status of the write request :type write_request: uv.WriteRequest :type status: uv.StatusCodes :readonly: False :type: ((uv.WriteRequest, uv.StatusCodes) -> None) | ((Any, uv.WriteRequest, uv.StatusCodes) -> None) """ amount = len(self.uv_buffers) if send_stream is None: arguments = self.uv_buffers, amount, uv_write_cb init = lib.uv_write else: arguments = self.uv_buffers, amount, self.send_stream.uv_stream, uv_write_cb init = lib.uv_write2 super(WriteRequest, self).__init__(stream.loop, arguments, stream.uv_stream, init)
@base.request_callback('uv_connect_cb') def uv_connect_cb(connect_request, status): """ :type connect_request: uv.ConnectRequest :param status: int """ connect_request.on_connect(connect_request, error.StatusCodes.get(status)) @request.RequestType.CONNECT
[docs]class ConnectRequest(request.UVRequest): """ Request to connect to a specific address. .. note:: There is a specific connect request type for every stream type. :param stream: stream to establish a connection on :param on_connect: callback which should run after a connection has been established or on error :type stream: uv.UVStream :type on_connect: ((uv.ConnectRequest, uv.StatusCodes) -> None) | ((Any, uv.ConnectRequest, uv.StatusCodes) -> None) """ __slots__ = ['stream', 'on_connect'] uv_request_type = 'uv_connect_t*' def __init__(self, stream, arguments, on_connect=None): if stream.closing: raise error.ClosedHandleError() uv_handle = stream.base_handle.uv_object arguments = arguments + (uv_connect_cb, ) super(ConnectRequest, self).__init__(stream.loop, arguments, uv_handle) self.stream = stream """ Stream to establish a connection on. :readonly: True :type: uv.UVStream """ self.on_connect = on_connect or common.dummy_callback """ Callback which should run after a connection has been established. .. function: on_connect(connect_request, status) :param connect_request: request the call originates from :param status: status of the connect request :type connect_request: uv.ConnectRequest :type status: uv.StatusCodes :readonly: False :type: ((uv.ConnectRequest, uv.StatusCodes) -> None) | ((Any, uv.ConnectRequest, uv.StatusCodes) -> None) """
@base.handle_callback('uv_connection_cb') def uv_connection_cb(stream_handle, status): """ :type stream_handle: uv.UVStream :type status: int """ stream_handle.on_connection(stream_handle, error.StatusCodes.get(status)) @base.handle_callback('uv_read_cb') def uv_read_cb(stream_handle, length, uv_buffer): """ :type stream_handle: uv.UVStream :type length: int :type uv_buffer: ffi.CData[uv_buf_t*] """ data = stream_handle.loop.allocator.finalize(stream_handle, length, uv_buffer) if length < 0: # pragma: no cover status = error.StatusCodes.get(length) data = b'' else: status = error.StatusCodes.SUCCESS stream_handle.on_read(stream_handle, status, data) @handle.HandleTypes.STREAM
[docs]class UVStream(handle.UVHandle): """ The base class of all libuv based streams. .. note:: This class must not be instantiated directly. Please use the sub-classes for specific communication channels. :param loop: event loop the handle should run on :param ipc: stream should support inter process communication or not :param arguments: arguments passed to the underling libuv initializer :param on_read: callback which should be called when data has been read :param on_connection: callback which should run after a new connection has been made or on error (if stream is in listen mode) :type loop: uv.Loop :type ipc: bool :type arguments: tuple :type on_read: ((uv.UVStream, uv.StatusCodes, bytes) -> None) | ((Any, uv.UVStream, uv.StatusCodes, bytes) -> None) :type on_connection: ((uv.UVStream, uv.StatusCodes, bytes) -> None) | ((Any, uv.UVStream, uv.StatusCodes, bytes) -> None) """ __slots__ = ['uv_stream', 'on_read', 'on_connection', 'ipc'] def __init__(self, loop, ipc, arguments, on_read, on_connection): super(UVStream, self).__init__(loop, arguments) self.uv_stream = ffi.cast('uv_stream_t*', self.base_handle.uv_object) self.on_read = on_read or common.dummy_callback """ Callback which should be called when data has been read. .. note:: Data might be a zero-bytes long bytes object. In contrast to the Python standard library this does not indicate any error, especially not `EOF`. .. function:: on_read(stream_handle, status, data) :param stream_handle: handle the call originates from :param status: status of the handle (indicate any errors) :param data: data which has been read :type stream_handle: uv.UVStream :type status: uv.StatusCodes :type data: bytes | Any :readonly: False :type: ((uv.UVStream, uv.StatusCodes, bytes) -> None) | ((Any, uv.UVStream, uv.StatusCodes, bytes) -> None) """ self.on_connection = on_connection or common.dummy_callback """ Callback which should run after a new connection has been made or on error (if stream is in listen mode). .. function:: on_connection(stream_handle, status) :param stream_handle: handle the call originates from :param status: status of the new connection :type stream_handle: uv.UVStream :type status: uv.StatusCodes :readonly: False :type: ((uv.UVStream, uv.StatusCodes, uv.UVStream) -> None) | ((Any, uv.UVStream, uv.StatusCodes, uv.UVStream) -> None) """ self.ipc = ipc """ Stream does support inter process communication or not. :readonly: True :type: bool """ @property def readable(self): """ :readonly: True :type: bool """ if self.closing: return False return bool(lib.uv_is_readable(self.uv_stream)) @property def writable(self): """ :readonly: True :type: bool """ if self.closing: return False return bool(lib.uv_is_writable(self.uv_stream)) @property def family(self): """ Address family of stream, may be None. :rtype: int | None """ raise NotImplementedError()
[docs] def shutdown(self, on_shutdown=None): """ :type on_shutdown: ((uv.ShutdownRequest, uv.StatusCodes) -> None) | ((Any, uv.ShutdownRequest, uv.StatusCodes) -> None) :rtype: uv.ShutdownRequest """ return ShutdownRequest(self, on_shutdown)
[docs] def listen(self, backlog=5, on_connection=None): """ Start listening for incoming connections. :raises uv.UVError: error while start listening for incoming connections :raises uv.ClosedHandleError: handle has already been closed or is closing :param backlog: number of connections the kernel might queue :param on_connection: callback which should run after a new connection has been made (overrides the current callback if specified) :type backlog: int :type on_connection: ((uv.UVStream, uv.StatusCodes) -> None) | ((Any, uv.UVStream, uv.StatusCodes) -> None) """ if self.closing: raise error.ClosedHandleError() self.on_connection = on_connection or self.on_connection code = lib.uv_listen(self.uv_stream, backlog, uv_connection_cb) if code != error.StatusCodes.SUCCESS: raise error.UVError(code)
[docs] def read_start(self, on_read=None): """ :raises uv.UVError: error while start reading data from the stream :raises uv.ClosedHandleError: handle has already been closed or is closing :type on_read: ((uv.UVStream, uv.StatusCodes, bytes) -> None) | ((Any, uv.UVStream, uv.StatusCodes, bytes) -> None) """ if self.closing: raise error.ClosedHandleError() self.on_read = on_read or self.on_read code = lib.uv_read_start(self.uv_stream, handle.uv_alloc_cb, uv_read_cb) if code != error.StatusCodes.SUCCESS: raise error.UVError(code) self.set_pending()
[docs] def read_stop(self): """ :raises uv.UVError: error while stop reading data from the stream """ if self.closing: return code = lib.uv_read_stop(self.uv_stream) if code != error.StatusCodes.SUCCESS: raise error.UVError(code) self.clear_pending()
[docs] def write(self, buffers, send_stream=None, on_write=None): """ :type buffers: tuple[bytes] | list[bytes] | bytes :type send_stream: uv.TCP | uv.Pipe | None :type on_write: ((uv.WriteRequest, uv.StatusCodes) -> None) | ((Any, uv.WriteRequest, uv.StatusCodes) -> None) :returns: issued write request :rtype: uv.WriteRequest """ return WriteRequest(self, buffers, send_stream, on_write)
[docs] def try_write(self, buffers): """ Immediately write data to the stream without issuing a write request. Throws :class:`uv.error.TemporaryUnavailableError` if data could not be written immediately, otherwise it returns the number of written bytes. :raises uv.UVError: error while writing data :raises uv.ClosedHandleError: handle has already been closed or is closing :raises uv.error.TemporaryUnavailableError: unable to write data immediately :param buffers: data which should be written :type buffers: tuple[bytes] | list[bytes] | bytes :return: number of bytes written :rtype: int """ if self.closing: raise error.ClosedHandleError() uv_buffers = library.make_uv_buffers(buffers) code = lib.uv_try_write(self.uv_stream, uv_buffers, len(uv_buffers)) if code < 0: # pragma: no cover raise error.UVError(code) return code
[docs] def accept(self, cls=None, *arguments, **keywords): """ Accept a new stream. This might be a new client connection or a stream sent by inter process communication. .. warning:: There should be no need to use this method directly, it is mainly for internal purposes. :raises uv.UVError: error while accepting incoming stream :raises uv.ClosedHandleError: handle has already been closed or is closing :param cls: type of the new stream :param arguments: arguments passed to the constructor of the new stream :param keywords: keywords passed to the constructor of the new stream :type cls: type :type arguments: tuple :type keywords: dict :return: new stream connection of type `cls` :rtype: uv.UVStream """ if self.closing: raise error.ClosedHandleError() connection = (cls or type(self))(*arguments, **keywords) code = lib.uv_accept(self.uv_stream, connection.uv_stream) if code != error.StatusCodes.SUCCESS: raise error.UVError(code) return connection
abstract.Stream.register(UVStream)