Source code for uv.dns

# -*- coding: utf-8 -*-

# Copyright (C) 2016, Maximilian Köhl <mail@koehlma.de>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License version 3 as published by
# the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.

from __future__ import absolute_import, division, print_function, unicode_literals

import socket
import warnings

from . import base, common, error, library, request
from .library import ffi, lib


[docs]class AddressFamilies(common.Enumeration): """ Address Families """ UNKNOWN = socket.AF_UNSPEC """ Unknown or unspecified family. :type: uv.AddressFamilies """ INET4 = socket.AF_INET """ Internet protocol version 4 family. :type: uv.AddressFamilies """ INET6 = socket.AF_INET6 """ Internet protocol version 6 family. :type: uv.AddressFamilies """ UNIX = getattr(socket, 'AF_UNIX', -1) """ Unix domain sockets family. :type: uv.AddressFamilies """
[docs]class SocketTypes(common.Enumeration): """ Socket Types """ STREAM = socket.SOCK_STREAM """ Stream socket type. :type: uv.SocketTypes """ DRGAM = socket.SOCK_DGRAM """ Datagram socket type. :type: uv.SocketTypes """ RAW = socket.SOCK_RAW """ Raw socket type. :type: uv.SocketTypes """ RDM = socket.SOCK_RDM """ Pragmatic general multicast socket type. :type: uv.SocketTypes """ SEQPACKET = socket.SOCK_SEQPACKET """ Sequence packet socket type. :type: uv.SocketTypes """
[docs]class SocketProtocols(common.Enumeration): """ Socket Protocols """ TCP = getattr(socket, 'IPPROTO_TCP', 6) """ Transmission control protocol. :type: uv.SocketProtocols """ UDP = getattr(socket, 'IPPROTO_UDP', 17) """ User datagram protocol. :type: uv.SocketProtocols """ RAW = getattr(socket, 'IPPROTO_RAW', 255) """ Raw socket protocol. :type: uv.SocketProtocols """ ICMP4 = getattr(socket, 'IPPROTO_ICMP', 1) """ Internet version 4 control message protocol. :type: uv.SocketProtocols """ ICMP6 = getattr(socket, 'IPPROTO_ICMPV6', 58) """ Internet version 6 control message protocol. :type: uv.SocketProtocols """
[docs]class Address(tuple): """ Internet Protocol Address """ __slots__ = [] def __new__(cls, host, port): return tuple.__new__(cls, (host, port)) @property def host(self): """ Address Host :rtype: unicode """ return self[0] @property def port(self): """ Address Port :rtype: int """ return self[1]
[docs]class Address4(Address): """ Internet protocol version 4 address. """ __slots__ = [] family = AddressFamilies.INET4 """ Address Family """ def __repr__(self): return '<Address4 host="{self.host}", port={self.port}>'.format(self=self)
[docs]class Address6(Address): """ Internet protocol version 6 address. """ __slots__ = [] family = AddressFamilies.INET6 """ Address Family """ def __new__(cls, host, port, flowinfo=0, scope_id=0): return tuple.__new__(cls, (host, port, flowinfo, scope_id)) def __repr__(self): return ('<Address6 host="{self.host}", port={self.port}, ' 'flowinfo={self.flowinfo}, scope_id={self.scope_id}>').format(self=self) @property def flowinfo(self): """ Address Flow Information :rtype: int """ return self[2] @property def scope_id(self): """ Address Scope Id :rtype: int """ return self[3]
[docs]class AddrInfo(tuple): """ Address Information """ __slots__ = [] def __new__(cls, family, socktype, protocol, canonname, address): return tuple.__new__(cls, (AddressFamilies.get(family), SocketTypes.get(socktype), SocketProtocols.get(protocol), canonname, address)) def __repr__(self): return ('<AddressInfo family={self.family!r}, type={self.type!r}, ' 'protocol={self.protocol!r}, canonname="{self.canonname}", ' 'address={self.address!r}>').format(self=self) @property def family(self): """ Address Family :rtype: uv.AddressFamilies | int """ return self[0] @property def socktype(self): """ Socket Type :rtype: uv.SocketTypes | int """ return self[1] @property def protocol(self): """ Socket Protocol :rtype: uv.SocketProtocols | int """ return self[2] @property def canonname(self): """ Canonical Name :rtype: unicode | None """ return self[3] @property def address(self): """ Address :rtype: uv.Address | tuple | None """ return self[4]
[docs]class NameInfo(tuple): """ Name Information """ __slots__ = [] def __new__(cls, hostname, service): return tuple.__new__(cls, (hostname, service)) def __repr__(self): return ('<NameInfo hostname="{self.hostname}", service="{self.service}">' ).format(self=self) @property def hostname(self): """ Hostname :rtype: unicode """ return self[0] @property def service(self): """ Service-Name :rtype: unicode """ return self[1]
def unpack_addrinfo(c_addrinfo): """ :type c_addrinfo: ffi.CDate[struct addrinfo*] """ items, c_next = [], c_addrinfo while c_next: family = c_next.ai_family socktype = c_next.ai_socktype protocol = c_next.ai_protocol if c_next.ai_canonname: canonname = ffi.string(c_next.ai_canonname).decode() else: canonname = None address = unpack_sockaddr(c_next.ai_addr) if c_next.ai_addr else None items.append(AddrInfo(family, socktype, protocol, canonname, address)) c_next = c_next.ai_next if c_addrinfo: lib.uv_freeaddrinfo(c_addrinfo) return items def unpack_sockaddr(c_sockaddr): """ :type c_sockaddr: ffi.CData[struct sockaddr*] """ if c_sockaddr.sa_family == socket.AF_INET: c_sockaddr_in4 = ffi.cast('struct sockaddr_in*', c_sockaddr) c_host = ffi.new('char[16]') port = socket.ntohs(c_sockaddr_in4.sin_port) lib.uv_ip4_name(c_sockaddr_in4, c_host, 16) return Address4(ffi.string(c_host).decode(), port) elif c_sockaddr.sa_family == socket.AF_INET6: c_sockaddr_in6 = ffi.cast('struct sockaddr_in6*', c_sockaddr) c_additional = lib.cross_get_ipv6_additional(c_sockaddr_in6) c_host = ffi.new('char[40]') flowinfo = c_additional.flowinfo scope_id = c_additional.scope_id port = socket.ntohs(c_sockaddr_in6.sin6_port) lib.uv_ip6_name(c_sockaddr_in6, c_host, 40) return Address6(ffi.string(c_host).decode(), port, flowinfo, scope_id) @base.request_callback('uv_getaddrinfo_cb') def uv_getaddrinfo_cb(addrinfo_request, status, _): """ :type addrinfo_request: uv.GetAddrInfo :type status: int :type _: Any """ if status == error.StatusCodes.SUCCESS: addrinfo_request.populate() code = error.StatusCodes.get(status) addrinfo = addrinfo_request.addrinfo addrinfo_request.callback(addrinfo_request, code, addrinfo) @request.RequestType.GETADDRINFO
[docs]class GetAddrInfo(request.UVRequest): """ Request to get address information for specified host and port (service). If no callback is provided the request is executed synchronously. :raises uv.UVError: error while initializing the handle :param host: host to get address information for :param port: port (service) to get address information for :param family: address family hint :param socktype: socket type hint :param protocol: protocol type hint :param flags: flags to configure the behavior of `getaddrinfo()` :param callback: callback which should be called after address information has been fetched or on error :param loop: event loop the request should run on :type host: unicode :type port: int :type family: uv.AddressFamilies | int :type socktype: uv.SocketTypes | int :type protocol: uv.SocketProtocols | int :type flags: int :type callback: ((uv.GetAddrInfo, uv.StatusCodes, list[uv.AddrInfo]) -> None) | ((Any, uv.GetAddrInfo, uv.StatusCodes, list[uv.AddrInfo]) -> None) | None :type loop: uv.Loop """ __slots__ = ['uv_getaddrinfo', 'callback', 'host', 'port', 'hints', 'flags', 'addrinfo'] uv_request_type = 'uv_getaddrinfo_t*' uv_request_init = lib.uv_getaddrinfo def __init__(self, host, port, family=0, socktype=0, protocol=0, flags=0, callback=None, loop=None): c_hints = ffi.new('struct addrinfo*') c_hints.ai_family = family c_hints.ai_socktype = socktype c_hints.ai_protocol = protocol c_hints.ai_flags = flags self.callback = callback or common.dummy_callback """ Callback which should be called after address information has been fetched or on error. .. function:: callback(request, code, addrinfo) :param request: request the call originates from :param code: status of the request :param addrinfo: list of fetched address information objects :type request: uv.GetAddrInfo :type code: uv.StatusCodes :type addrinfo: list[uv.AddrInfo] :readonly: False :type: ((uv.GetAddrInfo, uv.StatusCodes, list[uv.AddrInfo]) -> None) | ((Any, uv.GetAddrInfo, uv.StatusCodes, list[uv.AddrInfo]) -> None) """ self.host = host """ Host to get address information for. :readonly: True :type: unicode """ self.port = port """ Port (service) to get address information for. :readonly: True :type: int """ self.hints = AddrInfo(family, socktype, protocol, None, None) """ Address information hints. :readonly: True :type: uv.AddrInfo """ self.flags = flags """ Flags to configure the behavior of `getaddrinfo()`. :readonly: True :type: int """ self.addrinfo = [] """ Resulting list with address information. Is populated with address information objects after the request has been successfully completed. :readonly: True :type: list[uv.AddrInfo] """ uv_callback = ffi.NULL if callback is None else uv_getaddrinfo_cb arguments = uv_callback, host.encode(), str(port).encode(), c_hints super(GetAddrInfo, self).__init__(loop, arguments) self.uv_getaddrinfo = self.base_request.uv_object if callback is None: self.populate() base.finalize_request(self) def populate(self): """ Populate the address information list. .. warning:: Only for internal purposes! """ if self.uv_getaddrinfo.addrinfo: self.addrinfo = unpack_addrinfo(self.uv_getaddrinfo.addrinfo) self.uv_getaddrinfo.addrinfo = ffi.NULL
[docs]def getaddrinfo(host, port, family=0, socktype=0, protocol=0, flags=0, callback=None, loop=None): """ Get address information for specified host and port (service). See :class:`uv.dns.GetAddrInfo` for parameter descriptions. :type host: unicode :type port: int :type family: uv.AddressFamilies | int :type socktype: uv.SocketTypes | int :type protocol: uv.SocketProtocols | int :type flags: int :type callback: ((uv.GetAddrInfo, uv.StatusCodes, list[uv.AddrInfo]) -> None) | ((Any, uv.GetAddrInfo, uv.StatusCodes, list[uv.AddrInfo]) -> None) | None :type loop: uv.Loop :rtype: uv.GetAddrInfo | list[uv.AddrInfo] """ addrinfo = GetAddrInfo(host, port, family, socktype, protocol, flags, callback, loop) return addrinfo.addrinfo if callback is None else addrinfo
@base.request_callback('uv_getnameinfo_cb') def uv_getnameinfo_cb(nameinfo_request, status, c_hostname, c_service): """ :param nameinfo_request: uv.GetNameInfo :param status: int :param c_hostname: ffi.CData[char*] :param c_service: ffi.CData[char*] """ hostname = ffi.string(c_hostname).decode() service = ffi.string(c_service).decode() code = error.StatusCodes.get(status) nameinfo_request.callback(nameinfo_request, code, hostname, service) @request.RequestType.GETNAMEINFO
[docs]class GetNameInfo(request.UVRequest): """ Request to get name information for specified ip and port. If no callback is provided the request is executed synchronously. :param ip: IP to get name information for :param port: port to get name information for :param flags: flags to configure the behavior of `getnameinfo()` :param callback: callback which should be called after name information has been fetched or on error :param loop: event loop the request should run on :type ip: unicode :type port: int :type flags: int :type callback: ((uv.GetNameInfo, uv.StatusCodes, unicode, unicode) -> None) | ((Any, uv.GetAddrInfo, uv.StatusCodes, unicode, unicode) -> None) | None :type loop: uv.Loop """ __slots__ = ['uv_getnameinfo', 'c_sockaddr', 'callback', 'ip', 'port', 'flags'] uv_request_type = 'uv_getnameinfo_t*' uv_request_init = lib.uv_getnameinfo def __init__(self, ip, port, flags=0, callback=None, loop=None): self.callback = callback """ Callback which should be called after name information has been fetched or on error. .. function:: callback(request, code, hostname, service) :param request: request the call originates from :param code: status of the request :param hostname: hostname corresponding to the given IP :param service: service name corresponding to the given port :type request: uv.GetNameInfo :type code: uv.StatusCodes :type hostname: unicode :type service: unicode :readonly: False :type: ((uv.GetNameInfo, uv.StatusCodes, unicode, unicode) -> None) | ((Any, uv.GetAddrInfo, uv.StatusCodes, unicode, unicode) -> None) """ self.ip = ip """ Ip to get name information for. :readonly: True :type: unicode """ self.port = port """ Port to get name information for. :readonly: True :type: int """ self.flags = flags """ Flags to configure the behavior of `getnameinfo()`. :readonly: True :type: int """ uv_callback = ffi.NULL if callback is None else uv_getnameinfo_cb arguments = (uv_callback, make_c_sockaddr(ip, port), flags) super(GetNameInfo, self).__init__(loop, arguments) self.uv_getnameinfo = self.base_request.uv_object if callback is None: base.finalize_request(self) @property def hostname(self): """ Hostname corresponding to the given IP. :readonly: True :type: unicode """ if self.uv_getnameinfo.host: return ffi.string(self.uv_getnameinfo.host).decode() @property def service(self): """ Service name corresponding to the given port. :readonly: True :return: unicode """ if self.uv_getnameinfo.service: return ffi.string(self.uv_getnameinfo.service).decode()
[docs]def getnameinfo(ip, port, flags=0, callback=None, loop=None): """ Get name information for specified IP and port. See :class:`uv.dns.GetNameInfo` for parameter descriptions. :type ip: unicode :type port: int :type flags: int :type callback: ((uv.GetNameInfo, uv.StatusCodes, unicode, unicode) -> None) | ((Any, uv.GetAddrInfo, uv.StatusCodes, unicode, unicode) -> None) | None :type loop: uv.Loop :rtype: uv.GetNameInfo | uv.NameInfo """ nameinfo = GetNameInfo(ip, port, flags, callback, loop) if callback is None: return NameInfo(nameinfo.hostname, nameinfo.service) return nameinfo
def make_c_sockaddr(ip, port, flowinfo=0, scope_id=0): """ Create a C sockaddr struct form the given information. :type ip: unicode :type port: int :type flowinfo: int :type scope_id: int """ c_storage = ffi.new('struct sockaddr_storage*') c_sockaddr = ffi.cast('struct sockaddr*', c_storage) library.c_require(c_sockaddr, c_storage) c_ip = ip.encode() c_sockaddr_in4 = ffi.cast('struct sockaddr_in*', c_storage) code = lib.uv_ip4_addr(c_ip, port, c_sockaddr_in4) if code == error.StatusCodes.SUCCESS: return c_sockaddr c_sockaddr_in6 = ffi.cast('struct sockaddr_in6*', c_storage) code = lib.uv_ip6_addr(c_ip, port, c_sockaddr_in6) if code != error.StatusCodes.SUCCESS: raise error.UVError(code) lib.cross_set_ipv6_additional(c_sockaddr_in6, flowinfo, scope_id) return c_sockaddr