前言
此处省略 150 字引言, 假装以后会补.
正文
以下功能均已收入 torequests
pip install torequests -U
-
查看当前进程的内存占用. 借助 psutil
-
代码
def print_mem(unit="MB"): """Show the proc-mem-cost with psutil, use this only for lazinesssss. :param unit: B, KB, MB, GB. """ try: import psutil, os B = float(psutil.Process(os.getpid()).memory_info().vms) KB = B / 1024 MB = KB / 1024 GB = MB / 1024 result = vars()[unit] print("memory usage: %.2f(%s)" % (result, unit)) return result except ImportError: print("pip install psutil first.") print_mem() # memory usage: 8.93(MB)
-
-
将 cURL 命令转为人类可读的 dict 参数, 兼容 requests.request
-
使用原始请求的好处
- 不会因为遗漏请求参数而导致请求失败, 比如 Referer, User-Agent, Accept 等
- 解析结果可以直接使用 requests.request(**result) 来发出请求
-
code
import shlex import argparse class _Curl: """Curl args parser. **Use curlparse function directly.** """ parser = argparse.ArgumentParser() parser.add_argument("curl") parser.add_argument("url") parser.add_argument("-X", "--method", default="get") parser.add_argument("-A", "--user-agent") parser.add_argument("-u", "--user") # <user[:password]> parser.add_argument("-x", "--proxy") # proxy.com:port parser.add_argument("-d", "--data") parser.add_argument("-F", "--form") parser.add_argument("--data-binary") parser.add_argument("--connect-timeout", type=float) parser.add_argument( "-H", "--header", action="append", default=[]) # key: value parser.add_argument("--compressed", action="store_true") def curlparse(string, encoding="utf-8"): """Translate curl-string into dict of request. :param string: standard curl-string, like `r'''curl ...'''`. :param encoding: encoding for post-data encoding. Basic Usage:: >>> from torequests.utils import curlparse >>> curl_string = '''curl 'https://p.3.cn?skuIds=1&nonsense=1&nonce=0' -H 'Pragma: no-cache' -H 'DNT: 1' -H 'Accept-Encoding: gzip, deflate' -H 'Accept-Language: zh-CN,zh;q=0.9' -H 'Upgrade-Insecure-Requests: 1' -H 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36' -H 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8' -H 'Cache-Control: no-cache' -H 'Referer: https://p.3.cn?skuIds=1&nonsense=1&nonce=0' -H 'Cookie: ASPSESSIONIDSQRRSADB=MLHDPOPCAMBDGPFGBEEJKLAF' -H 'Connection: keep-alive' --compressed''' >>> request_args = curlparse(curl_string) >>> request_args {'url': 'https://p.3.cn?skuIds=1&nonsense=1&nonce=0', 'headers': {'Pragma': 'no-cache', 'Dnt': '1', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'Cache-Control': 'no-cache', 'Referer': 'https://p.3.cn?skuIds=1&nonsense=1&nonce=0', 'Cookie': 'ASPSESSIONIDSQRRSADB=MLHDPOPCAMBDGPFGBEEJKLAF', 'Connection': 'keep-alive'}, 'method': 'get'} >>> import requests >>> requests.request(**request_args) <Response [200]> """ assert "\n" not in string, 'curl-string should not contain \\n, try r"...".' if string.startswith("http"): return {"url": string, "method": "get"} try: lex_list = shlex.split(string.strip()) except ValueError as e: if str(e) == 'No closing quotation' and string.count("'") % 2 != 0: print_info( "If `data` has single-quote ('), the `data` should be quote by double-quote, and add the `backslash`(\\) before original \"." ) raise e args, unknown = _Curl.parser.parse_known_args(lex_list) requests_args = {} headers = {} requests_args["url"] = args.url for header in args.header: key, value = header.split(":", 1) headers[key.title()] = value.strip() if args.user_agent: headers["User-Agent"] = args.user_agent if headers: requests_args["headers"] = headers if args.user: requests_args["auth"] = tuple( u for u in args.user.split(":", 1) + [""])[:2] # if args.proxy: # pass data = args.data or args.data_binary or args.form if data: if data.startswith("$"): data = data[1:] args.method = "post" if PY2: # TODO not fix the UnicodeEncodeError, so use `replace`, damn python2.x. data = data.replace(r'\r', '\r').replace(r'\n', '\n') else: data = data.encode( 'latin-1', 'backslashreplace').decode('unicode-escape').encode(encoding) requests_args["data"] = data requests_args["method"] = args.method.lower() if args.connect_timeout: requests_args["timeout"] = args.connect_timeout return requests_args print( curlparse( r'''curl 'https://httpbin.org/get' -H 'Connection: keep-alive' -H 'Cache-Control: max-age=0' -H 'DNT: 1' -H 'Upgrade-Insecure-Requests: 1' -H 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.70 Safari/537.36' -H 'Sec-Fetch-User: ?1' -H 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3' -H 'Sec-Fetch-Site: none' -H 'Sec-Fetch-Mode: navigate' -H 'Accept-Encoding: gzip, deflate, br' -H 'Accept-Language: zh-CN,zh;q=0.9' --compressed''' )) # {'url': 'https://httpbin.org/get', 'headers': {'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'Dnt': '1', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.70 Safari/537.36', 'Sec-Fetch-User': '?1', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-Mode': 'navigate', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9'}, 'method': 'get'}
-
-
将多个可迭代对象合并成一个生成器
-
Python3 中已经默认实现
-
from itertools import chain
-
-
code
def itertools_chain(*iterables): """For the shortage of Python2's, Python3: `from itertools import chain`.""" for it in iterables: for element in it: yield element
-
-
将一个序列均分成 n 等份 & 将一个序列按照长度 n 等分
-
code
from itertools import chain as itertools_chain def slice_by_size(seq, size): """Slice a sequence into chunks, return as a generation of chunks with `size`. """ filling = object() for it in zip(*(itertools_chain(seq, [filling] * size),) * size): if filling in it: it = tuple(i for i in it if i is not filling) if it: yield it def slice_into_pieces(seq, n): """Slice a sequence into `n` pieces, return a generation of n pieces""" length = len(seq) if length % n == 0: size = length // n else: size = length // n + 1 for it in slice_by_size(seq, size): yield it n = 3 items = list(range(10)) chunks = slice_by_size(items, n) print(list(chunks)) # [(0, 1, 2), (3, 4, 5), (6, 7, 8), (9,)] chunks = slice_into_pieces(items, n) print(list(chunks)) # [(0, 1, 2, 3), (4, 5, 6, 7), (8, 9)] # ================================== # 不在乎性能, slice_by_size 也可以使用如下简单的写法 chunks = (items[i:i + n] for i in range(0, len(items), n)) print(list(chunks)) # [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
-
-
简单地根据时区处理 时间戳 ↔ 当地时间
-
code
import time TIMEZONE = int(-time.timezone / 3600) def ttime(timestamp=None, tzone=None, fail="", fmt="%Y-%m-%d %H:%M:%S"): """Translate timestamp into human-readable: %Y-%m-%d %H:%M:%S. :param timestamp: the timestamp float, or `time.time()` by default. :param tzone: time compensation, int(-time.timezone / 3600) by default, (can be set with TIMEZONE). :param fail: while raising an exception, return it. :param fmt: %Y-%m-%d %H:%M:%S, %z not work. :rtype: str >>> ttime() 2018-03-15 01:24:35 >>> ttime(1486572818.421858323) 2017-02-09 00:53:38 """ tzone = TIMEZONE if tzone is None else tzone fix_tz = tzone * 3600 if timestamp is None: timestamp = time.time() else: timestamp = float(timestamp) if 1e12 <= timestamp < 1e13: # Compatible timestamp with 13-digit milliseconds timestamp = timestamp / 1000 try: timestamp = time.time() if timestamp is None else timestamp return time.strftime(fmt, time.gmtime(timestamp + fix_tz)) except: return fail def ptime(timestr=None, tzone=None, fail=0, fmt="%Y-%m-%d %H:%M:%S"): """Translate %Y-%m-%d %H:%M:%S into timestamp. :param timestr: string like 2018-03-15 01:27:56, or time.time() if not set. :param tzone: time compensation, int(-time.timezone / 3600) by default, (can be set with TIMEZONE). :param fail: while raising an exception, return it. :param fmt: %Y-%m-%d %H:%M:%S, %z not work. :rtype: int >>> ptime('2018-03-15 01:27:56') 1521048476 """ tzone = TIMEZONE if tzone is None else tzone fix_tz = -(tzone * 3600 + time.timezone) #: str(timestr) for datetime.datetime object timestr = str(timestr or ttime()) try: return int(time.mktime(time.strptime(timestr, fmt)) + fix_tz) except: return fail # 当地时间的当前时间 print(ttime()) # 2019-10-25 19:12:59 # 0 时区的当前时间 print(ttime(tzone=0)) # 2019-10-25 11:12:59 # 当地时间的当前时间戳 print(ptime()) # 1572001979 print(ptime(ttime())) # 1572001979
-
-
将单位秒的数字转化成人类可读的字符串或字典
-
code
import time def split_seconds(seconds): """Split seconds into [day, hour, minute, second, ms] `divisor: 1, 24, 60, 60, 1000` `units: day, hour, minute, second, ms` >>> split_seconds(6666666) [77, 3, 51, 6, 0] """ ms = seconds * 1000 divisors = (1, 24, 60, 60, 1000) quotient, result = ms, [] for divisor in divisors[::-1]: quotient, remainder = divmod(quotient, divisor) result.append(quotient) if divisor == 1 else result.append(remainder) return result[::-1] def timeago(seconds=0, accuracy=4, format=0, lang="en"): """Translate seconds into human-readable. :param seconds: seconds (float/int). :param accuracy: 4 by default (units[:accuracy]), determine the length of elements. :param format: index of [led, literal, dict]. :param lang: en or cn. :param units: day, hour, minute, second, ms. >>> timeago(93245732.0032424, 5) '1079 days, 05:35:32,003' >>> timeago(93245732.0032424, 4, 1) '1079 days 5 hours 35 minutes 32 seconds' >>> timeago(-389, 4, 1) '-6 minutes 29 seconds 0 ms' """ assert format in [0, 1, 2], ValueError("format arg should be one of 0, 1, 2") negative = "-" if seconds < 0 else "" seconds = abs(seconds) if lang == "en": units = ("day", "hour", "minute", "second", "ms") elif lang == "cn": units = (u"天", u"小时", u"分钟", u"秒", u"毫秒") times = split_seconds(seconds) if format == 2: return dict(zip(units, times)) day, hour, minute, second, ms = times if format == 0: day_str = ("%d %s%s, " % (day, units[0], "s" if day > 1 and lang == "en" else "") if day else "") mid_str = ":".join(("%02d" % i for i in (hour, minute, second))) if accuracy > 4: mid_str += ",%03d" % ms return negative + day_str + mid_str elif format == 1: # find longest valid fields index (non-zero in front) valid_index = 0 for x, i in enumerate(times): if i > 0: valid_index = x break else: valid_index = x result_str = [ "%d %s%s" % (num, unit, "s" if lang == "en" and num > 1 and unit != "ms" else "") for num, unit in zip(times, units) ][valid_index:][:accuracy] result_str = " ".join(result_str) return negative + result_str print(split_seconds(6666666.66)) # [77.0, 3.0, 51.0, 6.0, 660.0] print(timeago(93245732.0032424, 5)) # 1079 days, 05:35:32,003 print(timeago(93245732.0032424, 4, 1)) # 1079 days 5 hours 35 minutes 32 seconds print(timeago(-389, 4, 1)) # -6 minutes 29 seconds 0 ms print(timeago(-389, 4, 1, 'cn')) # -6 分钟 29 秒 0 毫秒
-
-
将一个对象转为字符串, 用 utf-8 编码, 并取其 md5
-
code
import hashlib if not isinstance(range(1), list): # python3 unicode = str def md5(string, n=32, encoding="utf-8", skip_encode=False): """str(obj) -> md5_string :param string: string to operate. :param n: md5_str length. >>> from torequests.utils import md5 >>> md5(1, 10) '923820dcc5' >>> md5('test') '098f6bcd4621d373cade4e832627b4f6' """ todo = string if skip_encode else unicode(string).encode(encoding) if n == 32: return hashlib.md5(todo).hexdigest() elif isinstance(n, (int, float)): return hashlib.md5(todo).hexdigest()[(32 - n) // 2:(n - 32) // 2] elif isinstance(n, (tuple, list)): return hashlib.md5(todo).hexdigest()[n[0]:n[1]] print(md5('test')) # 098f6bcd4621d373cade4e832627b4f6 print(md5([1, 2, 3])) # 49a5a960c5714c2e29dd1a7e7b950741 print(md5(object)) # 6abc48afdb859b639e0a0e1edd225a99
-
-
一个简简单单的计数器, 实际貌似没什么用…
-
code
class Counts(object): """Counter for counting the times been called >>> from torequests.utils import Counts >>> cc = Counts() >>> cc.x 1 >>> cc.x 2 >>> cc.now 2 >>> cc.current 2 >>> cc.sub() 1 """ __slots__ = ("start", "step", "current", "total") def __init__(self, start=0, step=1): self.start = start self.step = step self.current = start self.total = -1 def clear(self): self.current = self.start @property def x(self): return self.add() @property def s(self): return self.sub() @property def c(self): return self.x @property def now(self): return self.current def add(self, num=None): self.current += num or self.step return self.current def sub(self, num=None): self.current -= num or self.step return self.current
-
-
给序列去重并返回一个保留原始顺序的生成器.
-
目前用过的最佳性能还是通过一个 seen set 来判断
-
code
def unique(seq, key=None, return_as=None): """Unique the seq and keep the order. Instead of the slow way: `lambda seq: (x for index, x in enumerate(seq) if seq.index(x)==index)` :param seq: raw sequence. :param return_as: generator for default, or list / set / str... >>> from torequests.utils import unique >>> a = [1,2,3,4,2,3,4] >>> unique(a) <generator object unique.<locals>.<genexpr> at 0x05720EA0> >>> unique(a, str) '1234' >>> unique(a, list) [1, 2, 3, 4] """ seen = set() add = seen.add if key: generator = (x for x in seq if key(x) not in seen and not add(key(x))) else: generator = (x for x in seq if x not in seen and not add(x)) if return_as: if return_as == str: return "".join(map(str, generator)) else: return return_as(generator) else: # python2 not support yield from return generator
-
-
通过正则表达式来注册和查找函数, 实现简易模式匹配
-
简单地说, 就是通过装饰器来绑定一个函数和一段正则表达式, 之后按照字符串来找到对应函数.
-
常见用途:
-
用正则表达式注册一个爬虫函数, 方便在遇到符合正则的 URL 时执行对应函数
-
HTTP server 的 route 查询
class Regex(object): """Register some objects(like functions) to the regular expression. >>> from torequests.utils import Regex, re >>> reg = Regex() >>> @reg.register_function('http.*cctv.*') ... def mock(): ... pass ... >>> reg.register('http.*HELLOWORLD', 'helloworld', instances='http://helloworld', flags=re.I) >>> reg.register('http.*HELLOWORLD2', 'helloworld2', flags=re.I) >>> reg.find('http://cctv.com') [<function mock at 0x031FC5D0>] >>> reg.match('http://helloworld') ['helloworld'] >>> reg.match('non-http://helloworld') [] >>> reg.search('non-http://helloworld') ['helloworld'] >>> len(reg.search('non-http://helloworld2')) 2 >>> print(reg.show_all()) ('http.*cctv.*') => => <class 'function'> mock "" ('http.*HELLOWORLD', re.IGNORECASE) => http://helloworld => <class 'str'> helloworld ('http.*HELLOWORLD2', re.IGNORECASE) => => <class 'str'> helloworld2 >>> reg.fuzzy('non-http://helloworld') [('http://helloworld', 95)] """ def __init__(self, ensure_mapping=False): """ :param ensure_mapping: ensure mapping one to one, if False, will return all(more than 1) mapped object list.""" self.container = [] self.ensure_mapping = ensure_mapping def register(self, patterns, obj=None, instances=None, **reg_kwargs): """Register one object which can be matched/searched by regex. :param patterns: a list/tuple/set of regex-pattern. :param obj: return it while search/match success. :param instances: instance list will search/match the patterns. :param reg_kwargs: kwargs for re.compile. """ assert obj, "bool(obj) should be True." patterns = patterns if isinstance(patterns, (list, tuple, set)) else [patterns] instances = instances or [] instances = ( instances if isinstance(instances, (list, tuple, set)) else [instances] ) for pattern in patterns: pattern_compiled = re.compile(pattern, **reg_kwargs) self.container.append((pattern_compiled, obj, instances)) if self.ensure_mapping: # check all instances to avoid one-to-many instances. self._check_instances() else: # no need to check all instances. for instance in instances: assert self.search(instance) == [obj] or self.match(instance) == [ obj ], ( "instance %s should fit at least one pattern %s" % (instance, pattern) ) def register_function(self, patterns, instances=None, **reg_kwargs): """Decorator for register.""" def wrapper(function): self.register(patterns, function, instances=instances, **reg_kwargs) return function return wrapper def find(self, string, default=None): """Return match or search result. :rtype: list""" return self.match(string) or self.search(string) or default def search(self, string, default=None): """Use re.search to find the result :rtype: list""" default = default if default else [] result = [item[1] for item in self.container if item[0].search(string)] if self.ensure_mapping: assert len(result) < 2, "%s matches more than one pattern: %s" % ( string, result, ) return result if result else default def match(self, string, default=None): """Use re.search to find the result :rtype: list""" default = default if default else [] result = [item[1] for item in self.container if item[0].match(string)] if self.ensure_mapping: assert len(result) < 2, "%s matches more than one pattern: %s" % ( string, result, ) return result if result else default def fuzzy(self, key, limit=5): """Give suggestion from all instances.""" instances = [i[2] for i in self.container if i[2]] if not instances: return instances = sum(instances, []) from fuzzywuzzy import process maybe = process.extract(key, instances, limit=limit) return maybe def _check_instances(self): for item in self.container: for instance in item[2]: assert self.search(instance) or self.match( instance ), "instance %s not fit pattern %s" % (instance, item[0].pattern) def show_all(self, as_string=True): """, python2 will not show flags""" result = [] for item in self.container: pattern = str(item[0])[10:] if PY3 else item[0].pattern instances = item[2] or [] value = ( '%s "%s"' % (item[1].__name__, (item[1].__doc__ or "")) if callable(item[1]) else str(item[1]) ) value = "%s %s" % (type(item[1]), value) result.append(" => ".join((pattern, ",".join(instances), value))) return "\n".join(result) if as_string else result
-
-
-
常用 User-Agent
-
code
class UA: """Some common User-Agents for crawler. Android, iPhone, iPad, Firefox, Chrome, IE6, IE9""" __slots__ = () Android = "Mozilla/5.0 (Linux; Android 5.1.1; Nexus 6 Build/LYZ28E) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Mobile Safari/537.36" iPhone = "Mozilla/5.0 (iPhone; CPU iPhone OS 10_3 like Mac OS X) AppleWebKit/602.1.50 (KHTML, like Gecko) CriOS/56.0.2924.75 Mobile/14E5239e Safari/602.1" iPad = "Mozilla/5.0 (iPad; CPU OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1" Firefox = ( "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:54.0) Gecko/20100101 Firefox/54.0" ) Chrome = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36" IE6 = "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)" IE9 = "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0;" WECHAT_ANDROID = "Mozilla/5.0 (Linux; Android 5.0; SM-N9100 Build/LRX21V) > AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 > Chrome/37.0.0.0 Mobile Safari/537.36 > MicroMessenger/6.0.2.56_r958800.520 NetType/WIFI" WECHAT_IOS = "Mozilla/5.0 (iPhone; CPU iPhone OS 5_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Mobile/9B176 MicroMessenger/4.3.2"
-
-
简单计时器装置, 支持装饰器, 函数闭包调用, 全局调用
-
原理就是通过 sys._getframe(stack_level).f_code.co_name 拿到栈信息
-
然后默认在 delete (也就是被回收时), 打印经过的时间, 或者调用 timer.x 打印一次
-
timeit.default_timer 比 time.time 更合理
-
具体用法见类文档中的实例.
-
PS: 这个类写的时候还没有 PySnooper 这个库, 否则就会参考它里面的方式来实现, 毕竟直接在栈信息里到处转悠还是比较容易出错的.
class Timer(object): """ Usage: init Timer anywhere: such as head of function, or head of module, then it will show log after del it by gc. :param name: be used in log or None. :param log_func: some function to show process. :param default_timer: use `timeit.default_timer` by default. :param rounding: None, or seconds will be round(xxx, rounding) :param readable: None, or use `timepass`: readable(cost_seconds) -> 00:00:01,234 :: from torequests.utils import Timer import time Timer() @Timer.watch() def test(a=1): Timer() time.sleep(1) def test_inner(): t = Timer('test_non_del') time.sleep(1) t.x test_inner() test(3) time.sleep(1) # [2018-03-10 02:16:48]: Timer [00:00:01]: test_non_del, start at 2018-03-10 02:16:47. # [2018-03-10 02:16:48]: Timer [00:00:02]: test(a=3), start at 2018-03-10 02:16:46. # [2018-03-10 02:16:48]: Timer [00:00:02]: test(3), start at 2018-03-10 02:16:46. # [2018-03-10 02:16:49]: Timer [00:00:03]: <module>: __main__ (temp_code.py), start at 2018-03-10 02:16:46. """ def __init__( self, name=None, log_func=None, default_timer=None, rounding=None, readable=None, log_after_del=True, stack_level=1, ): readable = readable or timepass self._log_after_del = False self.start_at = time.time() uid = md5("%s%s" % (self.start_at, id(self))) if not name: f_name = sys._getframe(stack_level).f_code.co_name f_local = sys._getframe(stack_level).f_locals if f_name == "<module>": f_vars = ": %s (%s)" % ( f_local.get("__name__"), os.path.split(f_local.get("__file__"))[-1], ) # f_vars = f_vars.replace(' __main__', '') else: f_vars = ( "(%s)" % ", ".join( [ "%s=%s" % (i, repr(f_local[i])) for i in sorted(f_local.keys()) ] ) if f_local else "()" ) if self not in f_local.values(): # add self to name space for __del__ way. sys._getframe(stack_level).f_locals.update(**{uid: self}) name = "%s%s" % (f_name, f_vars) self.name = name self.log_func = log_func self.timer = default_timer or timeit.default_timer self.rounding = rounding self.readable = readable self.start_timer = self.timer() self._log_after_del = log_after_del @property def string(self): """Only return the expect_string quietly.""" return self.tick() @property def x(self): """Call self.log_func(self) and return expect_string.""" self._log_after_del = False passed_string = self.string if self.log_func: self.log_func(self) else: print_info( "Timer [%(passed)s]: %(name)s, start at %(start)s." % ( dict( name=self.name, start=ttime(self.start_at), passed=passed_string ) ) ) return passed_string @property def passed(self): """Return the cost_seconds after starting up.""" return self.timer() - self.start_timer def tick(self): """Return the time cost string as expect.""" string = self.passed if self.rounding: string = round(string) if self.readable: string = self.readable(string) return string @staticmethod def watch(*timer_args, **timer_kwargs): """Decorator for Timer.""" def wrapper(function): @wraps(function) def inner(*args, **kwargs): args1 = ", ".join(map(repr, args)) if args else "" kwargs1 = ", ".join( ["%s=%s" % (i, repr(kwargs[i])) for i in sorted(kwargs.keys())] ) arg = ", ".join(filter(None, [args1, kwargs1])) name = "%s(%s)" % (function.__name__, arg) _ = Timer(name=name, *timer_args, **timer_kwargs) result = function(*args, **kwargs) return result return inner return wrapper def __del__(self): if self._log_after_del: # not be called by self.x yet. self.x def __enter__(self): return self def __exit__(self, *args): self.x
-
-
简简单单的剪贴板辅助工具
-
用途很明显, 开启剪贴板 watcher, 会在剪贴板内容发生改变的时候触发 callback
-
默认 callback 为原封不动打印内容+换行
-
支持异步调用, 即不阻塞主线程.
class ClipboardWatcher(object): """Watch clipboard with `pyperclip`, run callback while changed.""" def __init__(self, interval=0.2, callback=None): self.pyperclip = try_import("pyperclip") self.interval = interval self.callback = callback or self.default_callback self.temp = self.current def read(self): """Return the current clipboard content.""" return self.pyperclip.paste() def write(self, text): """Rewrite the current clipboard content.""" return self.pyperclip.copy(text) @property def current(self): """Return the current clipboard content.""" return self.read() def default_callback(self, text): """Default clean the \\n in text.""" text = text.replace("\r\n", "\n") text = "%s\n" % text flush_print(text, sep="", end="") return text def watch(self, limit=None, timeout=None): """Block method to watch the clipboard changing.""" start_time = time.time() count = 0 while not timeout or time.time() - start_time < timeout: new = self.read() if new != self.temp: count += 1 self.callback(new) if count == limit: break self.temp = new time.sleep(self.interval) @property def x(self): """Return self.watch()""" return self.watch() @threads(1) def watch_async(self, limit=None, timeout=None): """Non-block method to watch the clipboard changing.""" return self.watch(limit=limit, timeout=timeout)
-
-
常用的本地数据持久化功能
-
用途很简单, 就是想通过 key value 的方式在本地存放一点数据
-
之前我比较习惯用的是 sqlitedict 这个库, 性能没的说, 不过老得清理 sqlite 缓存 (VACUMM)
-
默认路径会放在 Windows / linux 的 User 目录下, 支持 pickle 存储和 json 存储
-
早期经常用这个来存储配置文件或者做简单的本地缓存, 现在用的时候少了, 毕竟没解决好多进程的竞态问题
class Saver(object): """ Simple object persistent toolkit with pickle/json, if only you don't care the performance and security. **Do not set the key startswith "_"** :param path: if not set, will be ~/_saver.db. print(self._path) to show it. Set pickle's protocol < 3 for compatibility between python2/3, but use -1 for performance and some other optimizations. :param save_mode: pickle / json. >>> ss = Saver() >>> ss._path '/home/work/_saver.json' >>> ss.a = 1 >>> ss['b'] = 2 >>> str(ss) {'a': 1, 'b': 2, 'c': 3, 'd': 4} >>> del ss.b >>> str(ss) "{'a': 1, 'c': 3, 'd': 4}" >>> ss._update({'c': 3, 'd': 4}) >>> ss Saver(path="/home/work/_saver.json"){'a': 1, 'c': 3, 'd': 4} """ _instances = {} _locks = {} _protected_keys = { "_auto_backup", "_lock", "_path", "_saver_args", "_save_mode", "_cache", "__getitem__", "_keys", "_values", "__getattr__", "__len__", "_popitem", "_shutdown", "__setitem__", "__delitem__", "_save_obj", "_get", "__dict__", "_clear", "_locks", "__weakref__", "_items", "__module__", "_pop", "__contains__", "_load", "_save", "_update", "_set", "_protected_keys", "_instances", "_get_home_path", "_save_back_up", } _protected_keys = _protected_keys | set(object.__dict__.keys()) def __new__(cls, path=None, save_mode="json", auto_backup=False, **saver_args): # BORG path = path or cls._get_home_path(save_mode=save_mode) return cls._instances.setdefault(path, super(Saver, cls).__new__(cls)) def __init__(self, path=None, save_mode="json", auto_backup=False, **saver_args): super(Saver, self).__init__() self._auto_backup = auto_backup self._lock = self.__class__._locks.setdefault(path, Lock()) self._path = path or self._get_home_path(save_mode=save_mode) self._saver_args = saver_args self._save_mode = save_mode self._cache = self._load() @classmethod def _get_home_path(cls, save_mode=None): home = os.path.expanduser("~") if save_mode == "json": ext = "json" elif save_mode == "pickle": ext = "pkl" else: ext = "db" file_name = "_saver.%s" % ext path = os.path.join(home, file_name) return path def _save_back_up(self): with open(self._path, "rb") as f_raw: with open(self._path + ".bk", "wb") as f_bk: f_bk.write(f_raw.read()) def _save_obj(self, obj): mode = "wb" if self._save_mode == "pickle" else "w" with self._lock: with open(self._path, mode) as f: if self._save_mode == "json": json.dump(obj, f, **self._saver_args) if self._save_mode == "pickle": pickle.dump(obj, f, **self._saver_args) if self._auto_backup: self._save_back_up() return obj def _load(self): if not (os.path.isfile(self._path) and os.path.getsize(self._path)): cache = {} self._save_obj(cache) return cache mode = "rb" if self._save_mode == "pickle" else "r" with self._lock: with open(self._path, mode) as f: if self._save_mode == "json": return json.load(f) if self._save_mode == "pickle": return pickle.load(f) def _save(self): return self._save_obj(self._cache) def _set(self, key, value): if self._save_mode == "json": try: json.dumps(value) except TypeError: Config.utils_logger.warning( "Saver._set(%s, %s) failed: bad type, using str(value) instead." % (key, value) ) value = str(value) self._cache[key] = value self._save() def _get(self, key, default=None): return self._cache.get(key, default) def __setattr__(self, key, value): if key in self._protected_keys: object.__setattr__(self, key, value) else: self._set(key, value) def __getattr__(self, key): if key in self._protected_keys: return object.__getattribute__(self, key) return self._get(key) def __contains__(self, key): return key in self._cache def __delattr__(self, key): self._cache.pop(key, None) self._save() def __dir__(self): return dir(object) def __len__(self): return len(self._cache) def _clear(self): self._cache = {} self._save() def _shutdown(self): if self._auto_backup: os.remove(self._path + ".bk") return os.remove(self._path) def _keys(self): return self._cache.keys() def _items(self): return self._cache.items() def _values(self): return self._cache.values() def _pop(self, key, default=None): result = self._cache.pop(key, default) self._save() return result def _popitem(self): result = self._cache.popitem() self._save() return result def _update(self, *args, **kwargs): self._cache.update(*args, **kwargs) self._save() def __getitem__(self, key): if key in self._cache: return self._get(key) raise KeyError def __setitem__(self, key, value): self._set(key, value) def __delitem__(self, key): self._cache.pop(key, None) self._save() def __str__(self): return str(self._cache) def __repr__(self): return 'Saver(path="%s")%s' % (self._path, reprlib.repr(self._cache))
-
-
估算一大串无序数字的平均间隔
-
当初写这个的使用场景实际是在一万多篇文章的发布时间里, 找出这个作者的发布规律, 因为只是估算个大概, 所以不想导入机器学习相关的第三方库, 就简单实现了一个
-
计算方法: 先对序列排序, 然后相邻数字求差, 将差值序列排序取中位数
def guess_interval(nums, accuracy=0): """Given a seq of number, return the median, only calculate interval >= accuracy. :: from torequests.utils import guess_interval import random seq = [random.randint(1, 100) for i in range(20)] print(guess_interval(seq, 5)) # sorted_seq: [2, 10, 12, 19, 19, 29, 30, 32, 38, 40, 41, 54, 62, 69, 75, 79, 82, 88, 97, 99] # diffs: [8, 7, 10, 6, 13, 8, 7, 6, 6, 9] # median: 8 """ if not nums: return 0 nums = sorted([int(i) for i in nums]) if len(nums) == 1: return nums[0] diffs = [nums[i + 1] - nums[i] for i in range(len(nums) - 1)] diffs = [item for item in diffs if item >= accuracy] sorted_diff = sorted(diffs) result = sorted_diff[len(diffs) // 2] return result
-
-
将字符串按指定方式解析成多维列表
-
用的比较少, 用法见 doc 吧
-
主要就是把多层 for 循环的代码稍微节省一下
import re def _re_split_mixin(string, sep, reg=False): if reg: return re.split(sep, string) else: return string.split(sep) def split_n(string, seps, reg=False): r"""Split strings into n-dimensional list. :: from torequests.utils import split_n ss = '''a b c d e f 1 2 3 4 5 6 a b c d e f 1 2 3 4 5 6 a b c d e f 1 2 3 4 5 6''' print(split_n(ss, ('\n', ' ', ' '))) # [[['a', 'b', 'c'], ['d', 'e', 'f'], ['1', '2', '3'], ['4', '5', '6']], [['a', 'b', 'c'], ['d', 'e', 'f'], ['1', '2', '3'], ['4', '5', '6']], [['a', 'b', 'c'], ['d', 'e', 'f'], ['1', '2', '3'], ['4', '5', '6']]] print(split_n(ss, ['\s+'], reg=1)) # ['a', 'b', 'c', 'd', 'e', 'f', '1', '2', '3', '4', '5', '6', 'a', 'b', 'c', 'd', 'e', 'f', '1', '2', '3', '4', '5', '6', 'a', 'b', 'c', 'd', 'e', 'f', '1', '2', '3', '4', '5', '6'] """ deep = len(seps) if not deep: return string return [split_n(i, seps[1:]) for i in _re_split_mixin(string, seps[0], reg=reg)]
-
-
将一个函数丢到后台执行, 并且主线程如果跑完, 不要等这个线程完成就可以结束
-
说白了就是将一个同步阻塞的函数, 丢到一个新线程里跑
-
默认情况下, 这个现成没跑完, 主线程是无法退出的
-
但是如果设置了 Thread.daemon = True, 主线程结束时不需要阻塞等待该线程结束
-
这个是有点水… 还不如 atexit 有用…
def bg(func): """Run a function in background, will not block main thread's exit.(thread.daemon=True) :: from torequests.utils import bg, print_info import time def test1(n): time.sleep(n) print_info(n, 'done') @bg def test2(n): time.sleep(n) print_info(n, 'done') test3 = bg(test1) test2(1) test3(1) print_info('not be blocked') time.sleep(2) # [2018-06-12 23:46:19](L81): not be blocked # [2018-06-12 23:46:20](L81): 1 done # [2018-06-12 23:46:20](L81): 1 done """ @wraps(func) def wrapper(*args, **kwargs): t = Thread(target=func, args=args, kwargs=kwargs) # t.daemon = True t.start() return t return wrapper
-
-
一个简单的倒计时
-
正常情况下是阻塞主线程并不换行打印剩余秒数.
-
常规用途是在爬虫的两次请求之间显示剩余时间
def countdown( seconds=None, block=True, interval=1, daemon=True, tick_callback=None, finish_callback=None, ): """Run a countdown function to wait something, similar to threading.Timer, but will show the detail tick by tick_callback. :: from torequests.utils import countdown countdown(3) # 3 2 1 # countdown finished [3 seconds]: 2018-06-13 00:12:55 => 2018-06-13 00:12:58. countdown('2018-06-13 00:13:29') # 10 9 8 7 6 5 4 3 2 1 # countdown finished [10 seconds]: 2018-06-13 00:13:18 => 2018-06-13 00:13:28. """ def default_tick_callback(s, seconds, *args): flush_print(s, sep="", end=" ") def default_finish_callback(seconds, start_time): flush_print() def cd(seconds, interval): for s in range(seconds, 0, -interval): tick_callback(s, seconds, interval) time.sleep(interval) if callable(finish_callback): finish_callback(seconds, start_time) start_time = time.time() tick_callback = tick_callback or default_tick_callback finish_callback = ( default_finish_callback if finish_callback is None else finish_callback ) if unicode(seconds).isdigit(): seconds = int(seconds) elif isinstance(seconds, (unicode, str)): seconds = int(ptime(seconds) - time.time()) t = Thread(target=cd, args=(seconds, interval)) t.daemon = daemon t.start() if block: t.join()
-
-
乞丐版进度条
-
就是想做个不需要太多依赖, 只要稍微看看大体进度
-
用法见 Basic Usage
def flush_print(*args, **kwargs): """ Like print_function at python3, support flush, but not support file. :param sep: space by default :param end: '\\n' by default :param flush: True by default :: import time from torequests.utils import flush_print flush_print("=" * 10) for _ in range(10): time.sleep(0.2) flush_print("=", sep="", end="") """ # PY2 raise SyntaxError for : def flush_print(*args, sep='', end=''): sep, end, flush = ( kwargs.pop("sep", " "), kwargs.pop("end", "\n"), kwargs.pop("flush", 1), ) string = sep.join((unicode(i) for i in args)) sys.stdout.write("%s%s" % (string, end)) if flush: sys.stdout.flush() class ProgressBar(object): """Simple progress bar. :param size: total counts of calling ProgressBar.x. :param length: length of print log. :param sig: string of each printing log. Basic Usage:: pb = ProgressBar(50, 10) for _ in range(50): time.sleep(0.1) pb.x print("current completion rate:", pb.completion_rate) # ========== # ========== # current completion rate: 1.0 """ def __init__(self, size, length=100, sig="="): self.size = size or 0 self.length = length self.sig = sig self.current = 0 self.last_print = 0 self.printed = 0 if size: # use Fraction for the deviation of division self.chunk = Fraction(self.size, self.length) flush_print(self.sig * self.length) else: self.chunk = 1 def add(self, step): # ensure step >= 0 self.current += step count = int((self.current - self.last_print) / self.chunk) if count < 1: return self.printed for _ in range(count): self.printed += 1 flush_print(self.sig, end="") self.last_print = count * self.chunk + self.last_print if self.current == self.size: flush_print() return self.printed @property def x(self): return self.add(1) @property def completion_rate(self): return self.current / self.size
-
-
像 JS 那样的正则 match
-
也就是匹配到就匹配, 不然就给空字符串, 只要别报错就好
-
也可以注册到 re 库里去
-
用法见 Basic Usage
class RegMatch(object): """JS-like match object. Use index number to get groups, if not match or no group, will return ''.""" def __init__(self, item): self.item = item def __getattr__(self, key, default=null): return getattr(self.item, key, default) def __getitem__(self, index): if self.item is None: return "" if not isinstance(index, int): raise IndexError try: return self.item.group(index) except IndexError: return "" @classmethod def find_one(cls, pattern, string, flags=0): """JS-like match object. Use index number to get groups, if not match or no group, will return ''. Basic Usage:: >>> from torequests.utils import find_one >>> string = "abcd" >>> find_one("a.*", string) <torequests.utils.RegMatch object at 0x0705F1D0> >>> find_one("a.*", string)[0] 'abcd' >>> find_one("a.*", string)[1] '' >>> find_one("a(.)", string)[0] 'ab' >>> find_one("a(.)", string)[1] 'b' >>> find_one("a(.)", string)[2] or "default" 'default' >>> import re >>> item = find_one("a(B)(C)", string, flags=re.I | re.S) >>> item <torequests.utils.RegMatch object at 0x0705F1D0> >>> item[0] 'abc' >>> item[1] 'b' >>> item[2] 'c' >>> item[3] '' >>> # import re >>> # re.findone = find_one >>> register_re_findone() >>> re.findone('a(b)', 'abcd')[1] or 'default' 'b' """ item = re.search(pattern, string, flags=flags) return cls(item) def register_re_findone(): """import re; re.findone = find_one""" re.findone = find_one find_one = RegMatch.find_one
-
-
线程安全的冷却池
-
当初为了冷却代理避免反爬做的
-
初始化时指定一个 interval, 也就是没冷却够这个秒数的, 无法被取出
-
主要依靠的就是默认的 PriorityQueue, 天生线程安全
-
为了减少遍历, 所以每次按照上次使用时间来排序, 然后丢到队列里, 即越久未使用越靠前
-
队列头部的元素如果距离上次使用时间不超过 interval, 那么就按这个的差值 sleep
class TimeItem(object): """Used for Cooldown.""" __slots__ = ('data', 'use_at') def __init__(self, data, use_at): self.data = data self.use_at = use_at def __hash__(self): return hash(self.data) def __gt__(self, other): return self.use_at > other.use_at def __ge__(self, other): return self.use_at >= other.use_at def __lt__(self, other): return self.use_at < other.use_at def __le__(self, other): return self.use_at <= other.use_at def __eq__(self, other): return self.use_at == other.use_at def __ne__(self, other): return self.use_at != other.use_at class Cooldown(object): """Thread-safe Cooldown toolkit. :param init_items: iterables to add into the default queue at first. :param interval: each item will cooldown `interval` seconds before return. :param born_at_now: if be set True, the item.use_at will be set time.time() instead of 0 when adding to queue at the first time. >>> from torequests.logs import print_info >>> cd = Cooldown(range(1, 3), interval=2) >>> cd.add_items([3, 4]) >>> cd.add_item(5) >>> for _ in range(7): ... print_info(cd.get(1, 'timeout')) [2019-01-17 01:50:59] pyld.py(152): 1 [2019-01-17 01:50:59] pyld.py(152): 3 [2019-01-17 01:50:59] pyld.py(152): 5 [2019-01-17 01:50:59] pyld.py(152): 2 [2019-01-17 01:50:59] pyld.py(152): 4 [2019-01-17 01:51:00] pyld.py(152): timeout [2019-01-17 01:51:01] pyld.py(152): 1 >>> cd.size 5 """ def __init__(self, init_items=None, interval=0, born_at_now=False): self.interval = interval self.queue = PriorityQueue() self.use_at_function = self.get_now_timestamp if born_at_now else lambda: 0 self.add_items(init_items or []) @property def size(self): return self.queue.qsize() @property def all_items(self): return [item.data for item in self.queue.queue] def get_now_timestamp(self): return time.time() def add_item(self, item): if not isinstance(item, TimeItem): item = TimeItem(item, self.use_at_function()) self.queue.put(item) def add_items(self, items): for item in items: self.add_item(item) def remove_item(self, item): self.queue.queue = [i for i in self.queue.queue if i.data != item] return self.queue.qsize() def remove_items(self, items): self.queue.queue = [i for i in self.queue.queue if i.data in items] return self.queue.qsize() def get(self, timeout=None, default=None): try: start_time = time.time() if timeout is None: timeout = float('inf') while time.time() - start_time < timeout: item = self.queue.get(timeout=timeout) if time.time() - item.use_at < self.interval: self.queue.put(item) wait_time = self.interval - (time.time() - item.use_at) wait_time = min((wait_time, timeout)) time.sleep(wait_time) continue item.use_at = self.get_now_timestamp() self.queue.put(item) return item.data else: return default except Empty: return default
-
-
将 python2 和 python3 里常用的解析 URL 的库的名字统一化
-
大部分工作其实 requests.compat 都做了
if PY2: import repr as reprlib from Queue import Empty, PriorityQueue from urllib import quote, quote_plus, unquote_plus from urlparse import ( parse_qs, parse_qsl, urlparse, unquote, urljoin, urlsplit, urlunparse, ) from cgi import escape import HTMLParser unescape = HTMLParser.HTMLParser().unescape if PY3: import reprlib from urllib.parse import ( parse_qs, parse_qsl, urlparse, quote, quote_plus, unquote, unquote_plus, urljoin, urlsplit, urlunparse, ) from html import escape, unescape from queue import Empty, PriorityQueue unicode = str
-
-
压力测试 / 检测反爬频率限制
-
这个就不多提了, 不太道德, 违反了爬虫不敲打服务器的原则, 甚至会因为 dos 获罪
- 但是如果限制参数 n=1, interval=30, 则会变为 30 秒请求一次, 可以用来做连续几天的测试, 通过日志可以发现服务器那边的反爬安全频率.
- 找出安全频率的代码还没写完, 实际上也就是从一个很大的 interval (比如60s) 开始, 每隔一段时间(比如100次请求, 或者一个小时) 将 interval 减少百分之一或者十分之一, 直到触发反爬导致请求结果失败或发生变化.
-
这个的速度因为要做 md5 等操作, 性能打了很大折扣, 直接使用 torequests.Requests (基于 aiohttp ) 的情况下可以到 1500 qps, golang 原生可以到 6000 qps…
from torequests.crawlers import StressTest StressTest('http://localhost:8080', n=100, timeout=2).x # [2019-10-25 20:12:01] crawlers.py(485): [17496] response: 8389296c-2, start at 2019-10-25 20:11:38 (+00:00:23), 0.131s, 754.92 req/s [100.00 %] # [2019-10-25 20:12:01] crawlers.py(485): [17497] response: 8389296c-2, start at 2019-10-25 20:11:38 (+00:00:23), 0.131s, 754.96 req/s [100.00 %] # [2019-10-25 20:12:01] crawlers.py(485): [17498] response: 8389296c-2, start at 2019-10-25 20:11:38 (+00:00:23), 0.132s, 754.97 req/s [100.00 %] # [2019-10-25 20:12:01] crawlers.py(485): [17499] response: 8389296c-2, start at 2019-10-25 20:11:38 (+00:00:23), 0.132s, 755.02 req/s [100.00 %] # [2019-10-25 20:12:01] crawlers.py(485): [17500] response: 8389296c-2, start at 2019-10-25 20:11:38 (+00:00:23), 0.131s, 755.03 req/s [100.00 %]
-
-
净化一个复杂的请求, 只保留最简洁的参数
-
源代码有些脏, 就不展示了, 不是很优雅, 不过可以用
-
怕反爬就限制一下频率
-
实现原理就是得到全部 Request 参数, 然后一个个去掉, 如果去掉了某个导致 callback 结果 (默认是 md5 resp.content) 发生变化, 则该参数不是可忽略的参数
from torequests.crawlers import CleanRequest request = '''curl 'https://p.3.cn?skuIds=1&nonsense=1&nonce=0' -H 'Pragma: no-cache' -H 'DNT: 1' -H 'Accept-Encoding: gzip, deflate' -H 'Accept-Language: zh-CN,zh;q=0.9' -H 'Upgrade-Insecure-Requests: 1' -H 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36' -H 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8' -H 'Cache-Control: no-cache' -H 'Referer: https://p.3.cn?skuIds=1&nonsense=1&nonce=0' -H 'Cookie: ASPSESSIONIDSQRRSADB=MLHDPOPCAMBDGPFGBEEJKLAF' -H 'Connection: keep-alive' --compressed'''
c = CleanRequest(request) print(c.x)
{‘url’: ‘https://p.3.cn’, ‘method’: ‘get’}
-
总结
大部分代码都是学习过程中, 遇到的一些常见问题做的练习, 活到老学到老.
一句话送给喜欢自学的人:
欲修其身者,先正其心;欲正其心者,先诚其意;欲诚其意者,先致其知,致知在格物.
至诚之道, 可以前知.