| Server IP : 54.233.248.239 / Your IP : 172.28.20.13 Web Server : Apache System : Linux ip-172-28-29-189 6.5.0-1014-aws #14~22.04.1-Ubuntu SMP Thu Feb 15 15:27:06 UTC 2024 x86_64 User : www-data ( 33) PHP Version : 7.2.34-43+ubuntu22.04.1+deb.sury.org+1 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals, MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /snap/core18/current/usr/lib/python3/dist-packages/requests_unixsocket/ |
Upload File : |
import socket
from requests.adapters import HTTPAdapter
from requests.compat import urlparse, unquote
try:
from requests.packages.urllib3.connection import HTTPConnection
from requests.packages.urllib3.connectionpool import HTTPConnectionPool
except ImportError:
from urllib3.connection import HTTPConnection
from urllib3.connectionpool import HTTPConnectionPool
# The following was adapted from some code from docker-py
# https://github.com/docker/docker-py/blob/master/docker/unixconn/unixconn.py
class UnixHTTPConnection(HTTPConnection):
def __init__(self, unix_socket_url, timeout=60):
"""Create an HTTP connection to a unix domain socket
:param unix_socket_url: A URL with a scheme of 'http+unix' and the
netloc is a percent-encoded path to a unix domain socket. E.g.:
'http+unix://%2Ftmp%2Fprofilesvc.sock/status/pid'
"""
HTTPConnection.__init__(self, 'localhost', timeout=timeout)
self.unix_socket_url = unix_socket_url
self.timeout = timeout
def connect(self):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
socket_path = unquote(urlparse(self.unix_socket_url).netloc)
sock.connect(socket_path)
self.sock = sock
class UnixHTTPConnectionPool(HTTPConnectionPool):
def __init__(self, socket_path, timeout=60):
HTTPConnectionPool.__init__(self, 'localhost', timeout=timeout)
self.socket_path = socket_path
self.timeout = timeout
def _new_conn(self):
return UnixHTTPConnection(self.socket_path, self.timeout)
class UnixAdapter(HTTPAdapter):
def __init__(self, timeout=60):
super(UnixAdapter, self).__init__()
self.timeout = timeout
def get_connection(self, socket_path, proxies=None):
proxies = proxies or {}
proxy = proxies.get(urlparse(socket_path.lower()).scheme)
if proxy:
raise ValueError('%s does not support specifying proxies'
% self.__class__.__name__)
return UnixHTTPConnectionPool(socket_path, self.timeout)