前言

此处省略 150 字引言, 假装以后会补.

正文

以下功能均已收入 torequests

pip install torequests -U

  1. 查看当前进程的内存占用. 借助 psutil

    1. 代码

      def print_mem(unit="MB"):
          """Show the proc-mem-cost with psutil, use this only for lazinesssss.
            
          :param unit: B, KB, MB, GB.
          """
          try:
              import psutil, os
            
              B = float(psutil.Process(os.getpid()).memory_info().vms)
              KB = B / 1024
              MB = KB / 1024
              GB = MB / 1024
              result = vars()[unit]
              print("memory usage: %.2f(%s)" % (result, unit))
              return result
          except ImportError:
              print("pip install psutil first.")
            
            
      print_mem()  # memory usage: 8.93(MB)
      
  2. 将 cURL 命令转为人类可读的 dict 参数, 兼容 requests.request

    1. 使用原始请求的好处

      1. 不会因为遗漏请求参数而导致请求失败, 比如 Referer, User-Agent, Accept 等
      2. 解析结果可以直接使用 requests.request(**result) 来发出请求
    2. code

      import shlex
      import argparse
            
            
      class _Curl:
          """Curl args parser.
            
          **Use curlparse function directly.**
          """
            
          parser = argparse.ArgumentParser()
          parser.add_argument("curl")
          parser.add_argument("url")
          parser.add_argument("-X", "--method", default="get")
          parser.add_argument("-A", "--user-agent")
          parser.add_argument("-u", "--user")  # <user[:password]>
          parser.add_argument("-x", "--proxy")  # proxy.com:port
          parser.add_argument("-d", "--data")
          parser.add_argument("-F", "--form")
          parser.add_argument("--data-binary")
          parser.add_argument("--connect-timeout", type=float)
          parser.add_argument(
              "-H", "--header", action="append", default=[])  # key: value
          parser.add_argument("--compressed", action="store_true")
            
            
      def curlparse(string, encoding="utf-8"):
          """Translate curl-string into dict of request.
              :param string: standard curl-string, like `r'''curl ...'''`.
              :param encoding: encoding for post-data encoding.
            
          Basic Usage::
            
            >>> from torequests.utils import curlparse
            >>> curl_string = '''curl 'https://p.3.cn?skuIds=1&nonsense=1&nonce=0' -H 'Pragma: no-cache' -H 'DNT: 1' -H 'Accept-Encoding: gzip, deflate' -H 'Accept-Language: zh-CN,zh;q=0.9' -H 'Upgrade-Insecure-Requests: 1' -H 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36' -H 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8' -H 'Cache-Control: no-cache' -H 'Referer: https://p.3.cn?skuIds=1&nonsense=1&nonce=0' -H 'Cookie: ASPSESSIONIDSQRRSADB=MLHDPOPCAMBDGPFGBEEJKLAF' -H 'Connection: keep-alive' --compressed'''
            >>> request_args = curlparse(curl_string)
            >>> request_args
            {'url': 'https://p.3.cn?skuIds=1&nonsense=1&nonce=0', 'headers': {'Pragma': 'no-cache', 'Dnt': '1', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'Cache-Control': 'no-cache', 'Referer': 'https://p.3.cn?skuIds=1&nonsense=1&nonce=0', 'Cookie': 'ASPSESSIONIDSQRRSADB=MLHDPOPCAMBDGPFGBEEJKLAF', 'Connection': 'keep-alive'}, 'method': 'get'}
            >>> import requests
            >>> requests.request(**request_args)
            <Response [200]>
          """
          assert "\n" not in string, 'curl-string should not contain \\n, try r"...".'
          if string.startswith("http"):
              return {"url": string, "method": "get"}
          try:
              lex_list = shlex.split(string.strip())
          except ValueError as e:
              if str(e) == 'No closing quotation' and string.count("'") % 2 != 0:
                  print_info(
                      "If `data` has single-quote ('), the `data` should be quote by double-quote, and add the `backslash`(\\) before original \"."
                  )
              raise e
          args, unknown = _Curl.parser.parse_known_args(lex_list)
          requests_args = {}
          headers = {}
          requests_args["url"] = args.url
          for header in args.header:
              key, value = header.split(":", 1)
              headers[key.title()] = value.strip()
          if args.user_agent:
              headers["User-Agent"] = args.user_agent
          if headers:
              requests_args["headers"] = headers
          if args.user:
              requests_args["auth"] = tuple(
                  u for u in args.user.split(":", 1) + [""])[:2]
          # if args.proxy:
          # pass
          data = args.data or args.data_binary or args.form
          if data:
              if data.startswith("$"):
                  data = data[1:]
              args.method = "post"
              if PY2:
                  # TODO not fix the UnicodeEncodeError, so use `replace`, damn python2.x.
                  data = data.replace(r'\r', '\r').replace(r'\n', '\n')
              else:
                  data = data.encode(
                      'latin-1',
                      'backslashreplace').decode('unicode-escape').encode(encoding)
              requests_args["data"] = data
          requests_args["method"] = args.method.lower()
          if args.connect_timeout:
              requests_args["timeout"] = args.connect_timeout
          return requests_args
            
            
      print(
          curlparse(
              r'''curl 'https://httpbin.org/get' -H 'Connection: keep-alive' -H 'Cache-Control: max-age=0' -H 'DNT: 1' -H 'Upgrade-Insecure-Requests: 1' -H 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.70 Safari/537.36' -H 'Sec-Fetch-User: ?1' -H 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3' -H 'Sec-Fetch-Site: none' -H 'Sec-Fetch-Mode: navigate' -H 'Accept-Encoding: gzip, deflate, br' -H 'Accept-Language: zh-CN,zh;q=0.9' --compressed'''
          ))
      # {'url': 'https://httpbin.org/get', 'headers': {'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'Dnt': '1', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.70 Safari/537.36', 'Sec-Fetch-User': '?1', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-Mode': 'navigate', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9'}, 'method': 'get'}
      
  3. 将多个可迭代对象合并成一个生成器

    1. Python3 中已经默认实现

      1. from itertools import chain

    2. code

      def itertools_chain(*iterables):
          """For the shortage of Python2's, Python3: `from itertools import chain`."""
          for it in iterables:
              for element in it:
                  yield element
      
  4. 将一个序列均分成 n 等份 & 将一个序列按照长度 n 等分

    1. code

      from itertools import chain as itertools_chain
            
            
      def slice_by_size(seq, size):
          """Slice a sequence into chunks, return as a generation of chunks with `size`.  """
          filling = object()
          for it in zip(*(itertools_chain(seq, [filling] * size),) * size):
              if filling in it:
                  it = tuple(i for i in it if i is not filling)
              if it:
                  yield it
            
            
      def slice_into_pieces(seq, n):
          """Slice a sequence into `n` pieces, return a generation of n pieces"""
          length = len(seq)
          if length % n == 0:
              size = length // n
          else:
              size = length // n + 1
          for it in slice_by_size(seq, size):
              yield it
            
            
      n = 3
      items = list(range(10))
      chunks = slice_by_size(items, n)
      print(list(chunks))
      # [(0, 1, 2), (3, 4, 5), (6, 7, 8), (9,)]
      chunks = slice_into_pieces(items, n)
      print(list(chunks))
      # [(0, 1, 2, 3), (4, 5, 6, 7), (8, 9)]
      # ==================================
      # 不在乎性能, slice_by_size 也可以使用如下简单的写法
      chunks = (items[i:i + n] for i in range(0, len(items), n))
      print(list(chunks))
      # [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
      
  5. 简单地根据时区处理 时间戳 ↔ 当地时间

    1. code

      import time
            
      TIMEZONE = int(-time.timezone / 3600)
            
            
      def ttime(timestamp=None, tzone=None, fail="", fmt="%Y-%m-%d %H:%M:%S"):
          """Translate timestamp into human-readable: %Y-%m-%d %H:%M:%S.
            
          :param timestamp: the timestamp float, or `time.time()` by default.
          :param tzone: time compensation, int(-time.timezone / 3600) by default,
                      (can be set with TIMEZONE).
          :param fail: while raising an exception, return it.
          :param fmt: %Y-%m-%d %H:%M:%S, %z not work.
          :rtype: str
            
          >>> ttime()
          2018-03-15 01:24:35
          >>> ttime(1486572818.421858323)
          2017-02-09 00:53:38
          """
          tzone = TIMEZONE if tzone is None else tzone
          fix_tz = tzone * 3600
          if timestamp is None:
              timestamp = time.time()
          else:
              timestamp = float(timestamp)
              if 1e12 <= timestamp < 1e13:
                  # Compatible timestamp with 13-digit milliseconds
                  timestamp = timestamp / 1000
          try:
              timestamp = time.time() if timestamp is None else timestamp
              return time.strftime(fmt, time.gmtime(timestamp + fix_tz))
          except:
              return fail
            
            
      def ptime(timestr=None, tzone=None, fail=0, fmt="%Y-%m-%d %H:%M:%S"):
          """Translate %Y-%m-%d %H:%M:%S into timestamp.
            
          :param timestr: string like 2018-03-15 01:27:56, or time.time() if not set.
          :param tzone: time compensation, int(-time.timezone / 3600) by default,
                      (can be set with TIMEZONE).
          :param fail: while raising an exception, return it.
          :param fmt: %Y-%m-%d %H:%M:%S, %z not work.
          :rtype: int
            
              >>> ptime('2018-03-15 01:27:56')
              1521048476
          """
          tzone = TIMEZONE if tzone is None else tzone
          fix_tz = -(tzone * 3600 + time.timezone)
          #: str(timestr) for datetime.datetime object
          timestr = str(timestr or ttime())
          try:
              return int(time.mktime(time.strptime(timestr, fmt)) + fix_tz)
          except:
              return fail
            
            
      # 当地时间的当前时间
      print(ttime())  # 2019-10-25 19:12:59
      # 0 时区的当前时间
      print(ttime(tzone=0))  # 2019-10-25 11:12:59
      # 当地时间的当前时间戳
      print(ptime())  # 1572001979
      print(ptime(ttime()))  # 1572001979
      
  6. 将单位秒的数字转化成人类可读的字符串或字典

    1. code

      import time
            
            
      def split_seconds(seconds):
          """Split seconds into [day, hour, minute, second, ms]
            
              `divisor: 1, 24, 60, 60, 1000`
            
              `units: day, hour, minute, second, ms`
            
          >>> split_seconds(6666666)
          [77, 3, 51, 6, 0]
          """
          ms = seconds * 1000
          divisors = (1, 24, 60, 60, 1000)
          quotient, result = ms, []
          for divisor in divisors[::-1]:
              quotient, remainder = divmod(quotient, divisor)
              result.append(quotient) if divisor == 1 else result.append(remainder)
          return result[::-1]
            
            
      def timeago(seconds=0, accuracy=4, format=0, lang="en"):
          """Translate seconds into human-readable.
            
              :param seconds: seconds (float/int).
              :param accuracy: 4 by default (units[:accuracy]), determine the length of elements.
              :param format: index of [led, literal, dict].
              :param lang: en or cn.
              :param units: day, hour, minute, second, ms.
            
          >>> timeago(93245732.0032424, 5)
          '1079 days, 05:35:32,003'
          >>> timeago(93245732.0032424, 4, 1)
          '1079 days 5 hours 35 minutes 32 seconds'
          >>> timeago(-389, 4, 1)
          '-6 minutes 29 seconds 0 ms'
          """
          assert format in [0, 1,
                            2], ValueError("format arg should be one of 0, 1, 2")
          negative = "-" if seconds < 0 else ""
          seconds = abs(seconds)
          if lang == "en":
              units = ("day", "hour", "minute", "second", "ms")
          elif lang == "cn":
              units = (u"天", u"小时", u"分钟", u"秒", u"毫秒")
          times = split_seconds(seconds)
          if format == 2:
              return dict(zip(units, times))
            
          day, hour, minute, second, ms = times
            
          if format == 0:
              day_str = ("%d %s%s, " % (day, units[0],
                                        "s" if day > 1 and lang == "en" else "")
                         if day else "")
              mid_str = ":".join(("%02d" % i for i in (hour, minute, second)))
              if accuracy > 4:
                  mid_str += ",%03d" % ms
              return negative + day_str + mid_str
          elif format == 1:
              # find longest valid fields index (non-zero in front)
              valid_index = 0
              for x, i in enumerate(times):
                  if i > 0:
                      valid_index = x
                      break
              else:
                  valid_index = x
              result_str = [
                  "%d %s%s" % (num, unit, "s"
                               if lang == "en" and num > 1 and unit != "ms" else "")
                  for num, unit in zip(times, units)
              ][valid_index:][:accuracy]
              result_str = " ".join(result_str)
              return negative + result_str
            
            
      print(split_seconds(6666666.66))
      # [77.0, 3.0, 51.0, 6.0, 660.0]
      print(timeago(93245732.0032424, 5))
      # 1079 days, 05:35:32,003
      print(timeago(93245732.0032424, 4, 1))
      # 1079 days 5 hours 35 minutes 32 seconds
      print(timeago(-389, 4, 1))
      # -6 minutes 29 seconds 0 ms
      print(timeago(-389, 4, 1, 'cn'))
      # -6 分钟 29 秒 0 毫秒
      
  7. 将一个对象转为字符串, 用 utf-8 编码, 并取其 md5

    1. code

      import hashlib
            
      if not isinstance(range(1), list):
          # python3
          unicode = str
            
            
      def md5(string, n=32, encoding="utf-8", skip_encode=False):
          """str(obj) -> md5_string
            
          :param string: string to operate.
          :param n: md5_str length.
            
          >>> from torequests.utils import md5
          >>> md5(1, 10)
          '923820dcc5'
          >>> md5('test')
          '098f6bcd4621d373cade4e832627b4f6'
          """
          todo = string if skip_encode else unicode(string).encode(encoding)
          if n == 32:
              return hashlib.md5(todo).hexdigest()
          elif isinstance(n, (int, float)):
              return hashlib.md5(todo).hexdigest()[(32 - n) // 2:(n - 32) // 2]
          elif isinstance(n, (tuple, list)):
              return hashlib.md5(todo).hexdigest()[n[0]:n[1]]
            
            
      print(md5('test'))
      # 098f6bcd4621d373cade4e832627b4f6
      print(md5([1, 2, 3]))
      # 49a5a960c5714c2e29dd1a7e7b950741
      print(md5(object))
      # 6abc48afdb859b639e0a0e1edd225a99
      
  8. 一个简简单单的计数器, 实际貌似没什么用…

    1. code

            
      class Counts(object):
          """Counter for counting the times been called
            
          >>> from torequests.utils import Counts
          >>> cc = Counts()
          >>> cc.x
          1
          >>> cc.x
          2
          >>> cc.now
          2
          >>> cc.current
          2
          >>> cc.sub()
          1
          """
            
          __slots__ = ("start", "step", "current", "total")
            
          def __init__(self, start=0, step=1):
              self.start = start
              self.step = step
              self.current = start
              self.total = -1
            
          def clear(self):
              self.current = self.start
            
          @property
          def x(self):
              return self.add()
            
          @property
          def s(self):
              return self.sub()
            
          @property
          def c(self):
              return self.x
            
          @property
          def now(self):
              return self.current
            
          def add(self, num=None):
              self.current += num or self.step
              return self.current
            
          def sub(self, num=None):
              self.current -= num or self.step
              return self.current
      
  9. 给序列去重并返回一个保留原始顺序的生成器.

    1. 目前用过的最佳性能还是通过一个 seen set 来判断

    2. code

      def unique(seq, key=None, return_as=None):
          """Unique the seq and keep the order.
            
          Instead of the slow way:
              `lambda seq: (x for index, x in enumerate(seq) if seq.index(x)==index)`
            
          :param seq: raw sequence.
          :param return_as: generator for default, or list / set / str...
            
          >>> from torequests.utils import unique
          >>> a = [1,2,3,4,2,3,4]
          >>> unique(a)
          <generator object unique.<locals>.<genexpr> at 0x05720EA0>
          >>> unique(a, str)
          '1234'
          >>> unique(a, list)
          [1, 2, 3, 4]
          """
          seen = set()
          add = seen.add
          if key:
              generator = (x for x in seq if key(x) not in seen and not add(key(x)))
          else:
              generator = (x for x in seq if x not in seen and not add(x))
          if return_as:
              if return_as == str:
                  return "".join(map(str, generator))
              else:
                  return return_as(generator)
          else:
              # python2 not support yield from
              return generator
      
  10. 通过正则表达式来注册和查找函数, 实现简易模式匹配

    1. 简单地说, 就是通过装饰器来绑定一个函数和一段正则表达式, 之后按照字符串来找到对应函数.

    2. 常见用途:

      1. 用正则表达式注册一个爬虫函数, 方便在遇到符合正则的 URL 时执行对应函数

      2. HTTP server 的 route 查询

        class Regex(object):
            """Register some objects(like functions) to the regular expression.
                  
            >>> from torequests.utils import Regex, re
            >>> reg = Regex()
            >>> @reg.register_function('http.*cctv.*')
            ... def mock():
            ...     pass
            ...
            >>> reg.register('http.*HELLOWORLD', 'helloworld', instances='http://helloworld', flags=re.I)
            >>> reg.register('http.*HELLOWORLD2', 'helloworld2', flags=re.I)
            >>> reg.find('http://cctv.com')
            [<function mock at 0x031FC5D0>]
            >>> reg.match('http://helloworld')
            ['helloworld']
            >>> reg.match('non-http://helloworld')
            []
            >>> reg.search('non-http://helloworld')
            ['helloworld']
            >>> len(reg.search('non-http://helloworld2'))
            2
            >>> print(reg.show_all())
            ('http.*cctv.*') =>  => <class 'function'> mock ""
            ('http.*HELLOWORLD', re.IGNORECASE) => http://helloworld => <class 'str'> helloworld
            ('http.*HELLOWORLD2', re.IGNORECASE) =>  => <class 'str'> helloworld2
            >>> reg.fuzzy('non-http://helloworld')
            [('http://helloworld', 95)]
            """
                  
            def __init__(self, ensure_mapping=False):
                """
                :param ensure_mapping: ensure mapping one to one, if False,
                 will return all(more than 1) mapped object list."""
                self.container = []
                self.ensure_mapping = ensure_mapping
                  
            def register(self, patterns, obj=None, instances=None, **reg_kwargs):
                """Register one object which can be matched/searched by regex.
                  
                :param patterns: a list/tuple/set of regex-pattern.
                :param obj: return it while search/match success.
                :param instances: instance list will search/match the patterns.
                :param reg_kwargs: kwargs for re.compile.
                """
                assert obj, "bool(obj) should be True."
                patterns = patterns if isinstance(patterns, (list, tuple, set)) else [patterns]
                instances = instances or []
                instances = (
                    instances if isinstance(instances, (list, tuple, set)) else [instances]
                )
                for pattern in patterns:
                    pattern_compiled = re.compile(pattern, **reg_kwargs)
                    self.container.append((pattern_compiled, obj, instances))
                    if self.ensure_mapping:
                        # check all instances to avoid one-to-many instances.
                        self._check_instances()
                    else:
                        # no need to check all instances.
                        for instance in instances:
                            assert self.search(instance) == [obj] or self.match(instance) == [
                                obj
                            ], (
                                "instance %s should fit at least one pattern %s"
                                % (instance, pattern)
                            )
                  
            def register_function(self, patterns, instances=None, **reg_kwargs):
                """Decorator for register."""
                  
                def wrapper(function):
                    self.register(patterns, function, instances=instances, **reg_kwargs)
                    return function
                  
                return wrapper
                  
            def find(self, string, default=None):
                """Return match or search result.
                  
                :rtype: list"""
                return self.match(string) or self.search(string) or default
                  
            def search(self, string, default=None):
                """Use re.search to find the result
                  
                :rtype: list"""
                default = default if default else []
                result = [item[1] for item in self.container if item[0].search(string)]
                if self.ensure_mapping:
                    assert len(result) < 2, "%s matches more than one pattern: %s" % (
                        string,
                        result,
                    )
                return result if result else default
                  
            def match(self, string, default=None):
                """Use re.search to find the result
                  
                :rtype: list"""
                default = default if default else []
                result = [item[1] for item in self.container if item[0].match(string)]
                if self.ensure_mapping:
                    assert len(result) < 2, "%s matches more than one pattern: %s" % (
                        string,
                        result,
                    )
                return result if result else default
                  
            def fuzzy(self, key, limit=5):
                """Give suggestion from all instances."""
                instances = [i[2] for i in self.container if i[2]]
                if not instances:
                    return
                instances = sum(instances, [])
                from fuzzywuzzy import process
                  
                maybe = process.extract(key, instances, limit=limit)
                return maybe
                  
            def _check_instances(self):
                for item in self.container:
                    for instance in item[2]:
                        assert self.search(instance) or self.match(
                            instance
                        ), "instance %s not fit pattern %s" % (instance, item[0].pattern)
                  
            def show_all(self, as_string=True):
                """, python2 will not show flags"""
                result = []
                for item in self.container:
                    pattern = str(item[0])[10:] if PY3 else item[0].pattern
                    instances = item[2] or []
                    value = (
                        '%s "%s"' % (item[1].__name__, (item[1].__doc__ or ""))
                        if callable(item[1])
                        else str(item[1])
                    )
                    value = "%s %s" % (type(item[1]), value)
                    result.append(" => ".join((pattern, ",".join(instances), value)))
                return "\n".join(result) if as_string else result
        
  11. 常用 User-Agent

    1. code

      class UA:
          """Some common User-Agents for crawler.
             
          Android, iPhone, iPad, Firefox, Chrome, IE6, IE9"""
             
          __slots__ = ()
          Android = "Mozilla/5.0 (Linux; Android 5.1.1; Nexus 6 Build/LYZ28E) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Mobile Safari/537.36"
          iPhone = "Mozilla/5.0 (iPhone; CPU iPhone OS 10_3 like Mac OS X) AppleWebKit/602.1.50 (KHTML, like Gecko) CriOS/56.0.2924.75 Mobile/14E5239e Safari/602.1"
          iPad = "Mozilla/5.0 (iPad; CPU OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1"
          Firefox = (
              "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:54.0) Gecko/20100101 Firefox/54.0"
          )
          Chrome = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36"
          IE6 = "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)"
          IE9 = "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0;"
          WECHAT_ANDROID = "Mozilla/5.0 (Linux; Android 5.0; SM-N9100 Build/LRX21V) > AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 > Chrome/37.0.0.0 Mobile Safari/537.36 > MicroMessenger/6.0.2.56_r958800.520 NetType/WIFI"
          WECHAT_IOS = "Mozilla/5.0 (iPhone; CPU iPhone OS 5_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Mobile/9B176 MicroMessenger/4.3.2"
      
  12. 简单计时器装置, 支持装饰器, 函数闭包调用, 全局调用

    1. 原理就是通过 sys._getframe(stack_level).f_code.co_name 拿到栈信息

    2. 然后默认在 delete (也就是被回收时), 打印经过的时间, 或者调用 timer.x 打印一次

    3. timeit.default_timer 比 time.time 更合理

    4. 具体用法见类文档中的实例.

    5. PS: 这个类写的时候还没有 PySnooper 这个库, 否则就会参考它里面的方式来实现, 毕竟直接在栈信息里到处转悠还是比较容易出错的.

      class Timer(object):
          """
          Usage:
              init Timer anywhere:
                  such as head of function, or head of module, then it will show log after del it by gc.
             
              :param name: be used in log or None.
              :param log_func: some function to show process.
              :param default_timer: use `timeit.default_timer` by default.
              :param rounding: None, or seconds will be round(xxx, rounding)
              :param readable: None, or use `timepass`: readable(cost_seconds) -> 00:00:01,234
             
              ::
             
                  from torequests.utils import Timer
                  import time
                  Timer()
             
                  @Timer.watch()
                  def test(a=1):
                      Timer()
                      time.sleep(1)
             
                      def test_inner():
                          t = Timer('test_non_del')
                          time.sleep(1)
                          t.x
             
                      test_inner()
             
                  test(3)
                  time.sleep(1)
                  # [2018-03-10 02:16:48]: Timer [00:00:01]: test_non_del, start at 2018-03-10 02:16:47.
                  # [2018-03-10 02:16:48]: Timer [00:00:02]: test(a=3), start at 2018-03-10 02:16:46.
                  # [2018-03-10 02:16:48]: Timer [00:00:02]: test(3), start at 2018-03-10 02:16:46.
                  # [2018-03-10 02:16:49]: Timer [00:00:03]: <module>: __main__ (temp_code.py), start at 2018-03-10 02:16:46.
             
          """
             
          def __init__(
              self,
              name=None,
              log_func=None,
              default_timer=None,
              rounding=None,
              readable=None,
              log_after_del=True,
              stack_level=1,
          ):
              readable = readable or timepass
              self._log_after_del = False
              self.start_at = time.time()
              uid = md5("%s%s" % (self.start_at, id(self)))
              if not name:
                  f_name = sys._getframe(stack_level).f_code.co_name
                  f_local = sys._getframe(stack_level).f_locals
                  if f_name == "<module>":
                      f_vars = ": %s (%s)" % (
                          f_local.get("__name__"),
                          os.path.split(f_local.get("__file__"))[-1],
                      )
                      # f_vars = f_vars.replace(' __main__', '')
                  else:
                      f_vars = (
                          "(%s)"
                          % ", ".join(
                              [
                                  "%s=%s" % (i, repr(f_local[i]))
                                  for i in sorted(f_local.keys())
                              ]
                          )
                          if f_local
                          else "()"
                      )
                  if self not in f_local.values():
                      # add self to name space for __del__ way.
                      sys._getframe(stack_level).f_locals.update(**{uid: self})
                  name = "%s%s" % (f_name, f_vars)
              self.name = name
              self.log_func = log_func
              self.timer = default_timer or timeit.default_timer
              self.rounding = rounding
              self.readable = readable
              self.start_timer = self.timer()
              self._log_after_del = log_after_del
             
          @property
          def string(self):
              """Only return the expect_string quietly."""
              return self.tick()
             
          @property
          def x(self):
              """Call self.log_func(self) and return expect_string."""
              self._log_after_del = False
              passed_string = self.string
              if self.log_func:
                  self.log_func(self)
              else:
                  print_info(
                      "Timer [%(passed)s]: %(name)s, start at %(start)s."
                      % (
                          dict(
                              name=self.name, start=ttime(self.start_at), passed=passed_string
                          )
                      )
                  )
              return passed_string
             
          @property
          def passed(self):
              """Return the cost_seconds after starting up."""
              return self.timer() - self.start_timer
             
          def tick(self):
              """Return the time cost string as expect."""
              string = self.passed
              if self.rounding:
                  string = round(string)
              if self.readable:
                  string = self.readable(string)
              return string
             
          @staticmethod
          def watch(*timer_args, **timer_kwargs):
              """Decorator for Timer."""
             
              def wrapper(function):
                  @wraps(function)
                  def inner(*args, **kwargs):
                      args1 = ", ".join(map(repr, args)) if args else ""
                      kwargs1 = ", ".join(
                          ["%s=%s" % (i, repr(kwargs[i])) for i in sorted(kwargs.keys())]
                      )
                      arg = ", ".join(filter(None, [args1, kwargs1]))
                      name = "%s(%s)" % (function.__name__, arg)
                      _ = Timer(name=name, *timer_args, **timer_kwargs)
                      result = function(*args, **kwargs)
                      return result
             
                  return inner
             
              return wrapper
             
          def __del__(self):
              if self._log_after_del:
                  # not be called by self.x yet.
                  self.x
             
          def __enter__(self):
              return self
             
          def __exit__(self, *args):
              self.x
      
  13. 简简单单的剪贴板辅助工具

    1. 用途很明显, 开启剪贴板 watcher, 会在剪贴板内容发生改变的时候触发 callback

    2. 默认 callback 为原封不动打印内容+换行

    3. 支持异步调用, 即不阻塞主线程.

      class ClipboardWatcher(object):
          """Watch clipboard with `pyperclip`, run callback while changed."""
             
          def __init__(self, interval=0.2, callback=None):
              self.pyperclip = try_import("pyperclip")
              self.interval = interval
              self.callback = callback or self.default_callback
              self.temp = self.current
             
          def read(self):
              """Return the current clipboard content."""
              return self.pyperclip.paste()
             
          def write(self, text):
              """Rewrite the current clipboard content."""
              return self.pyperclip.copy(text)
             
          @property
          def current(self):
              """Return the current clipboard content."""
              return self.read()
             
          def default_callback(self, text):
              """Default clean the \\n in text."""
              text = text.replace("\r\n", "\n")
              text = "%s\n" % text
              flush_print(text, sep="", end="")
              return text
             
          def watch(self, limit=None, timeout=None):
              """Block method to watch the clipboard changing."""
              start_time = time.time()
              count = 0
              while not timeout or time.time() - start_time < timeout:
                  new = self.read()
                  if new != self.temp:
                      count += 1
                      self.callback(new)
                      if count == limit:
                          break
                  self.temp = new
                  time.sleep(self.interval)
             
          @property
          def x(self):
              """Return self.watch()"""
              return self.watch()
             
          @threads(1)
          def watch_async(self, limit=None, timeout=None):
              """Non-block method to watch the clipboard changing."""
              return self.watch(limit=limit, timeout=timeout)
      
  14. 常用的本地数据持久化功能

    1. 用途很简单, 就是想通过 key value 的方式在本地存放一点数据

    2. 之前我比较习惯用的是 sqlitedict 这个库, 性能没的说, 不过老得清理 sqlite 缓存 (VACUMM)

    3. 默认路径会放在 Windows / linux 的 User 目录下, 支持 pickle 存储和 json 存储

    4. 早期经常用这个来存储配置文件或者做简单的本地缓存, 现在用的时候少了, 毕竟没解决好多进程的竞态问题

      class Saver(object):
          """
          Simple object persistent toolkit with pickle/json,
          if only you don't care the performance and security.
          **Do not set the key startswith "_"**
             
          :param path: if not set, will be ~/_saver.db. print(self._path) to show it.
              Set pickle's protocol < 3 for compatibility between python2/3,
              but use -1 for performance and some other optimizations.
          :param save_mode: pickle / json.
          >>> ss = Saver()
          >>> ss._path
          '/home/work/_saver.json'
          >>> ss.a = 1
          >>> ss['b'] = 2
          >>> str(ss)
          {'a': 1, 'b': 2, 'c': 3, 'd': 4}
          >>> del ss.b
          >>> str(ss)
          "{'a': 1, 'c': 3, 'd': 4}"
          >>> ss._update({'c': 3, 'd': 4})
          >>> ss
          Saver(path="/home/work/_saver.json"){'a': 1, 'c': 3, 'd': 4}
          """
             
          _instances = {}
          _locks = {}
          _protected_keys = {
              "_auto_backup",
              "_lock",
              "_path",
              "_saver_args",
              "_save_mode",
              "_cache",
              "__getitem__",
              "_keys",
              "_values",
              "__getattr__",
              "__len__",
              "_popitem",
              "_shutdown",
              "__setitem__",
              "__delitem__",
              "_save_obj",
              "_get",
              "__dict__",
              "_clear",
              "_locks",
              "__weakref__",
              "_items",
              "__module__",
              "_pop",
              "__contains__",
              "_load",
              "_save",
              "_update",
              "_set",
              "_protected_keys",
              "_instances",
              "_get_home_path",
              "_save_back_up",
          }
          _protected_keys = _protected_keys | set(object.__dict__.keys())
             
          def __new__(cls, path=None, save_mode="json", auto_backup=False, **saver_args):
              # BORG
              path = path or cls._get_home_path(save_mode=save_mode)
              return cls._instances.setdefault(path, super(Saver, cls).__new__(cls))
             
          def __init__(self, path=None, save_mode="json", auto_backup=False, **saver_args):
              super(Saver, self).__init__()
              self._auto_backup = auto_backup
              self._lock = self.__class__._locks.setdefault(path, Lock())
              self._path = path or self._get_home_path(save_mode=save_mode)
              self._saver_args = saver_args
              self._save_mode = save_mode
              self._cache = self._load()
             
          @classmethod
          def _get_home_path(cls, save_mode=None):
              home = os.path.expanduser("~")
              if save_mode == "json":
                  ext = "json"
              elif save_mode == "pickle":
                  ext = "pkl"
              else:
                  ext = "db"
              file_name = "_saver.%s" % ext
              path = os.path.join(home, file_name)
              return path
             
          def _save_back_up(self):
              with open(self._path, "rb") as f_raw:
                  with open(self._path + ".bk", "wb") as f_bk:
                      f_bk.write(f_raw.read())
             
          def _save_obj(self, obj):
              mode = "wb" if self._save_mode == "pickle" else "w"
              with self._lock:
                  with open(self._path, mode) as f:
                      if self._save_mode == "json":
                          json.dump(obj, f, **self._saver_args)
                      if self._save_mode == "pickle":
                          pickle.dump(obj, f, **self._saver_args)
                  if self._auto_backup:
                      self._save_back_up()
              return obj
             
          def _load(self):
              if not (os.path.isfile(self._path) and os.path.getsize(self._path)):
                  cache = {}
                  self._save_obj(cache)
                  return cache
              mode = "rb" if self._save_mode == "pickle" else "r"
              with self._lock:
                  with open(self._path, mode) as f:
                      if self._save_mode == "json":
                          return json.load(f)
                      if self._save_mode == "pickle":
                          return pickle.load(f)
             
          def _save(self):
              return self._save_obj(self._cache)
             
          def _set(self, key, value):
              if self._save_mode == "json":
                  try:
                      json.dumps(value)
                  except TypeError:
                      Config.utils_logger.warning(
                          "Saver._set(%s, %s) failed: bad type, using str(value) instead."
                          % (key, value)
                      )
                      value = str(value)
              self._cache[key] = value
              self._save()
             
          def _get(self, key, default=None):
              return self._cache.get(key, default)
             
          def __setattr__(self, key, value):
              if key in self._protected_keys:
                  object.__setattr__(self, key, value)
              else:
                  self._set(key, value)
             
          def __getattr__(self, key):
              if key in self._protected_keys:
                  return object.__getattribute__(self, key)
              return self._get(key)
             
          def __contains__(self, key):
              return key in self._cache
             
          def __delattr__(self, key):
              self._cache.pop(key, None)
              self._save()
             
          def __dir__(self):
              return dir(object)
             
          def __len__(self):
              return len(self._cache)
             
          def _clear(self):
              self._cache = {}
              self._save()
             
          def _shutdown(self):
              if self._auto_backup:
                  os.remove(self._path + ".bk")
              return os.remove(self._path)
             
          def _keys(self):
              return self._cache.keys()
             
          def _items(self):
              return self._cache.items()
             
          def _values(self):
              return self._cache.values()
             
          def _pop(self, key, default=None):
              result = self._cache.pop(key, default)
              self._save()
              return result
             
          def _popitem(self):
              result = self._cache.popitem()
              self._save()
              return result
             
          def _update(self, *args, **kwargs):
              self._cache.update(*args, **kwargs)
              self._save()
             
          def __getitem__(self, key):
              if key in self._cache:
                  return self._get(key)
              raise KeyError
             
          def __setitem__(self, key, value):
              self._set(key, value)
             
          def __delitem__(self, key):
              self._cache.pop(key, None)
              self._save()
             
          def __str__(self):
              return str(self._cache)
             
          def __repr__(self):
              return 'Saver(path="%s")%s' % (self._path, reprlib.repr(self._cache))
      
  15. 估算一大串无序数字的平均间隔

    1. 当初写这个的使用场景实际是在一万多篇文章的发布时间里, 找出这个作者的发布规律, 因为只是估算个大概, 所以不想导入机器学习相关的第三方库, 就简单实现了一个

    2. 计算方法: 先对序列排序, 然后相邻数字求差, 将差值序列排序取中位数

      def guess_interval(nums, accuracy=0):
          """Given a seq of number, return the median, only calculate interval >= accuracy.
             
          ::
             
              from torequests.utils import guess_interval
              import random
             
              seq = [random.randint(1, 100) for i in range(20)]
              print(guess_interval(seq, 5))
              # sorted_seq: [2, 10, 12, 19, 19, 29, 30, 32, 38, 40, 41, 54, 62, 69, 75, 79, 82, 88, 97, 99]
              # diffs: [8, 7, 10, 6, 13, 8, 7, 6, 6, 9]
              # median: 8
          """
          if not nums:
              return 0
          nums = sorted([int(i) for i in nums])
          if len(nums) == 1:
              return nums[0]
          diffs = [nums[i + 1] - nums[i] for i in range(len(nums) - 1)]
          diffs = [item for item in diffs if item >= accuracy]
          sorted_diff = sorted(diffs)
          result = sorted_diff[len(diffs) // 2]
          return result
      
  16. 将字符串按指定方式解析成多维列表

    1. 用的比较少, 用法见 doc 吧

    2. 主要就是把多层 for 循环的代码稍微节省一下

      import re
             
             
      def _re_split_mixin(string, sep, reg=False):
          if reg:
              return re.split(sep, string)
          else:
              return string.split(sep)
             
             
      def split_n(string, seps, reg=False):
          r"""Split strings into n-dimensional list.
             
          ::
             
              from torequests.utils import split_n
             
              ss = '''a b c  d e f  1 2 3  4 5 6
              a b c  d e f  1 2 3  4 5 6
              a b c  d e f  1 2 3  4 5 6'''
             
              print(split_n(ss, ('\n', '  ', ' ')))
              # [[['a', 'b', 'c'], ['d', 'e', 'f'], ['1', '2', '3'], ['4', '5', '6']], [['a', 'b', 'c'], ['d', 'e', 'f'], ['1', '2', '3'], ['4', '5', '6']], [['a', 'b', 'c'], ['d', 'e', 'f'], ['1', '2', '3'], ['4', '5', '6']]]
              print(split_n(ss, ['\s+'], reg=1))
              # ['a', 'b', 'c', 'd', 'e', 'f', '1', '2', '3', '4', '5', '6', 'a', 'b', 'c', 'd', 'e', 'f', '1', '2', '3', '4', '5', '6', 'a', 'b', 'c', 'd', 'e', 'f', '1', '2', '3', '4', '5', '6']
          """
          deep = len(seps)
          if not deep:
              return string
          return [split_n(i, seps[1:]) for i in _re_split_mixin(string, seps[0], reg=reg)]
      
  17. 将一个函数丢到后台执行, 并且主线程如果跑完, 不要等这个线程完成就可以结束

    1. 说白了就是将一个同步阻塞的函数, 丢到一个新线程里跑

    2. 默认情况下, 这个现成没跑完, 主线程是无法退出的

    3. 但是如果设置了 Thread.daemon = True, 主线程结束时不需要阻塞等待该线程结束

    4. 这个是有点水… 还不如 atexit 有用…

      def bg(func):
          """Run a function in background, will not block main thread's exit.(thread.daemon=True)
          ::
             
              from torequests.utils import bg, print_info
              import time
             
              def test1(n):
                  time.sleep(n)
                  print_info(n, 'done')
             
              @bg
              def test2(n):
                  time.sleep(n)
                  print_info(n, 'done')
             
              test3 = bg(test1)
             
              test2(1)
              test3(1)
              print_info('not be blocked')
              time.sleep(2)
             
              # [2018-06-12 23:46:19](L81): not be blocked
              # [2018-06-12 23:46:20](L81): 1 done
              # [2018-06-12 23:46:20](L81): 1 done
          """
             
          @wraps(func)
          def wrapper(*args, **kwargs):
              t = Thread(target=func, args=args, kwargs=kwargs)
              # t.daemon = True
              t.start()
              return t
             
          return wrapper
      
  18. 一个简单的倒计时

    1. 正常情况下是阻塞主线程并不换行打印剩余秒数.

    2. 常规用途是在爬虫的两次请求之间显示剩余时间

      def countdown(
          seconds=None,
          block=True,
          interval=1,
          daemon=True,
          tick_callback=None,
          finish_callback=None,
      ):
          """Run a countdown function to wait something, similar to threading.Timer,
           but will show the detail tick by tick_callback.
           ::
             
              from torequests.utils import countdown
             
              countdown(3)
              # 3 2 1 
              # countdown finished [3 seconds]: 2018-06-13 00:12:55 => 2018-06-13 00:12:58.
              countdown('2018-06-13 00:13:29')
              # 10 9 8 7 6 5 4 3 2 1 
              # countdown finished [10 seconds]: 2018-06-13 00:13:18 => 2018-06-13 00:13:28.
      """
             
          def default_tick_callback(s, seconds, *args):
              flush_print(s, sep="", end=" ")
             
          def default_finish_callback(seconds, start_time):
              flush_print()
             
          def cd(seconds, interval):
              for s in range(seconds, 0, -interval):
                  tick_callback(s, seconds, interval)
                  time.sleep(interval)
              if callable(finish_callback):
                  finish_callback(seconds, start_time)
             
          start_time = time.time()
          tick_callback = tick_callback or default_tick_callback
          finish_callback = (
              default_finish_callback if finish_callback is None else finish_callback
          )
             
          if unicode(seconds).isdigit():
              seconds = int(seconds)
          elif isinstance(seconds, (unicode, str)):
              seconds = int(ptime(seconds) - time.time())
          t = Thread(target=cd, args=(seconds, interval))
          t.daemon = daemon
          t.start()
          if block:
              t.join()
      
  19. 乞丐版进度条

    1. 就是想做个不需要太多依赖, 只要稍微看看大体进度

    2. 用法见 Basic Usage

      def flush_print(*args, **kwargs):
          """
          Like print_function at python3, support flush, but not support file.
          :param sep: space by default
          :param end: '\\n' by default
          :param flush: True by default
             
           ::
             
              import time
              from torequests.utils import flush_print
             
              flush_print("=" * 10)
              for _ in range(10):
                  time.sleep(0.2)
                  flush_print("=", sep="", end="")
          """
          # PY2 raise SyntaxError for : def flush_print(*args, sep='', end=''):
          sep, end, flush = (
              kwargs.pop("sep", " "),
              kwargs.pop("end", "\n"),
              kwargs.pop("flush", 1),
          )
          string = sep.join((unicode(i) for i in args))
          sys.stdout.write("%s%s" % (string, end))
          if flush:
              sys.stdout.flush()
             
             
      class ProgressBar(object):
          """Simple progress bar.
              :param size: total counts of calling ProgressBar.x.
              :param length: length of print log.
              :param sig: string of each printing log.
             
          Basic Usage::
             
              pb = ProgressBar(50, 10)
              for _ in range(50):
                  time.sleep(0.1)
                  pb.x
              print("current completion rate:", pb.completion_rate)
              # ==========
              # ==========
              # current completion rate: 1.0
          """
             
          def __init__(self, size, length=100, sig="="):
              self.size = size or 0
              self.length = length
              self.sig = sig
              self.current = 0
              self.last_print = 0
              self.printed = 0
              if size:
                  # use Fraction for the deviation of division
                  self.chunk = Fraction(self.size, self.length)
                  flush_print(self.sig * self.length)
              else:
                  self.chunk = 1
             
          def add(self, step):
              # ensure step >= 0
              self.current += step
              count = int((self.current - self.last_print) / self.chunk)
              if count < 1:
                  return self.printed
              for _ in range(count):
                  self.printed += 1
                  flush_print(self.sig, end="")
              self.last_print = count * self.chunk + self.last_print
              if self.current == self.size:
                  flush_print()
              return self.printed
             
          @property
          def x(self):
              return self.add(1)
             
          @property
          def completion_rate(self):
              return self.current / self.size
      
  20. 像 JS 那样的正则 match

    1. 也就是匹配到就匹配, 不然就给空字符串, 只要别报错就好

    2. 也可以注册到 re 库里去

    3. 用法见 Basic Usage

      class RegMatch(object):
          """JS-like match object. Use index number to get groups, if not match or no group, will return ''."""
             
          def __init__(self, item):
              self.item = item
             
          def __getattr__(self, key, default=null):
              return getattr(self.item, key, default)
             
          def __getitem__(self, index):
              if self.item is None:
                  return ""
              if not isinstance(index, int):
                  raise IndexError
              try:
                  return self.item.group(index)
              except IndexError:
                  return ""
             
          @classmethod
          def find_one(cls, pattern, string, flags=0):
              """JS-like match object. Use index number to get groups, if not match or no group, will return ''.
             
              Basic Usage::
             
                  >>> from torequests.utils import find_one
                  >>> string = "abcd"
                  >>> find_one("a.*", string)
                  <torequests.utils.RegMatch object at 0x0705F1D0>
                  >>> find_one("a.*", string)[0]
                  'abcd'
                  >>> find_one("a.*", string)[1]
                  ''
                  >>> find_one("a(.)", string)[0]
                  'ab'
                  >>> find_one("a(.)", string)[1]
                  'b'
                  >>> find_one("a(.)", string)[2] or "default"
                  'default'
                  >>> import re
                  >>> item = find_one("a(B)(C)", string, flags=re.I | re.S)
                  >>> item
                  <torequests.utils.RegMatch object at 0x0705F1D0>
                  >>> item[0]
                  'abc'
                  >>> item[1]
                  'b'
                  >>> item[2]
                  'c'
                  >>> item[3]
                  ''
                  >>> # import re
                  >>> # re.findone = find_one
                  >>> register_re_findone()
                  >>> re.findone('a(b)', 'abcd')[1] or 'default'
                  'b'
             
              """
              item = re.search(pattern, string, flags=flags)
              return cls(item)
             
             
      def register_re_findone():
          """import re; re.findone = find_one"""
          re.findone = find_one
             
             
      find_one = RegMatch.find_one
      
  21. 线程安全的冷却池

    1. 当初为了冷却代理避免反爬做的

    2. 初始化时指定一个 interval, 也就是没冷却够这个秒数的, 无法被取出

    3. 主要依靠的就是默认的 PriorityQueue, 天生线程安全

    4. 为了减少遍历, 所以每次按照上次使用时间来排序, 然后丢到队列里, 即越久未使用越靠前

    5. 队列头部的元素如果距离上次使用时间不超过 interval, 那么就按这个的差值 sleep

      class TimeItem(object):
          """Used for Cooldown."""
          __slots__ = ('data', 'use_at')
             
          def __init__(self, data, use_at):
              self.data = data
              self.use_at = use_at
             
          def __hash__(self):
              return hash(self.data)
             
          def __gt__(self, other):
              return self.use_at > other.use_at
             
          def __ge__(self, other):
              return self.use_at >= other.use_at
             
          def __lt__(self, other):
              return self.use_at < other.use_at
             
          def __le__(self, other):
              return self.use_at <= other.use_at
             
          def __eq__(self, other):
              return self.use_at == other.use_at
             
          def __ne__(self, other):
              return self.use_at != other.use_at
             
             
      class Cooldown(object):
          """Thread-safe Cooldown toolkit.
             
          :param init_items: iterables to add into the default queue at first.
          :param interval: each item will cooldown `interval` seconds before return.
          :param born_at_now: if be set True, the item.use_at will be set time.time()
                  instead of 0 when adding to queue at the first time.
             
          >>> from torequests.logs import print_info
          >>> cd = Cooldown(range(1, 3), interval=2)
          >>> cd.add_items([3, 4])
          >>> cd.add_item(5)
          >>> for _ in range(7):
          ...     print_info(cd.get(1, 'timeout'))
          [2019-01-17 01:50:59] pyld.py(152): 1
          [2019-01-17 01:50:59] pyld.py(152): 3
          [2019-01-17 01:50:59] pyld.py(152): 5
          [2019-01-17 01:50:59] pyld.py(152): 2
          [2019-01-17 01:50:59] pyld.py(152): 4
          [2019-01-17 01:51:00] pyld.py(152): timeout
          [2019-01-17 01:51:01] pyld.py(152): 1
          >>> cd.size
          5
          """
             
          def __init__(self, init_items=None, interval=0, born_at_now=False):
              self.interval = interval
              self.queue = PriorityQueue()
              self.use_at_function = self.get_now_timestamp if born_at_now else lambda: 0
              self.add_items(init_items or [])
             
          @property
          def size(self):
              return self.queue.qsize()
             
          @property
          def all_items(self):
              return [item.data for item in self.queue.queue]
             
          def get_now_timestamp(self):
              return time.time()
             
          def add_item(self, item):
              if not isinstance(item, TimeItem):
                  item = TimeItem(item, self.use_at_function())
              self.queue.put(item)
             
          def add_items(self, items):
              for item in items:
                  self.add_item(item)
             
          def remove_item(self, item):
              self.queue.queue = [i for i in self.queue.queue if i.data != item]
              return self.queue.qsize()
             
          def remove_items(self, items):
              self.queue.queue = [i for i in self.queue.queue if i.data in items]
              return self.queue.qsize()
             
          def get(self, timeout=None, default=None):
              try:
                  start_time = time.time()
                  if timeout is None:
                      timeout = float('inf')
                  while time.time() - start_time < timeout:
                      item = self.queue.get(timeout=timeout)
                      if time.time() - item.use_at < self.interval:
                          self.queue.put(item)
                          wait_time = self.interval - (time.time() - item.use_at)
                          wait_time = min((wait_time, timeout))
                          time.sleep(wait_time)
                          continue
                      item.use_at = self.get_now_timestamp()
                      self.queue.put(item)
                      return item.data
                  else:
                      return default
              except Empty:
                  return default
      
  22. 将 python2 和 python3 里常用的解析 URL 的库的名字统一化

    1. 大部分工作其实 requests.compat 都做了

      if PY2:
          import repr as reprlib
          from Queue import Empty, PriorityQueue
          from urllib import quote, quote_plus, unquote_plus
          from urlparse import (
              parse_qs,
              parse_qsl,
              urlparse,
              unquote,
              urljoin,
              urlsplit,
              urlunparse,
          )
          from cgi import escape
          import HTMLParser
             
          unescape = HTMLParser.HTMLParser().unescape
             
      if PY3:
          import reprlib
          from urllib.parse import (
              parse_qs,
              parse_qsl,
              urlparse,
              quote,
              quote_plus,
              unquote,
              unquote_plus,
              urljoin,
              urlsplit,
              urlunparse,
          )
          from html import escape, unescape
          from queue import Empty, PriorityQueue
             
          unicode = str
      
  23. 压力测试 / 检测反爬频率限制

    1. 这个就不多提了, 不太道德, 违反了爬虫不敲打服务器的原则, 甚至会因为 dos 获罪

      1. 但是如果限制参数 n=1, interval=30, 则会变为 30 秒请求一次, 可以用来做连续几天的测试, 通过日志可以发现服务器那边的反爬安全频率.
      2. 找出安全频率的代码还没写完, 实际上也就是从一个很大的 interval (比如60s) 开始, 每隔一段时间(比如100次请求, 或者一个小时) 将 interval 减少百分之一或者十分之一, 直到触发反爬导致请求结果失败或发生变化.
    2. 这个的速度因为要做 md5 等操作, 性能打了很大折扣, 直接使用 torequests.Requests (基于 aiohttp ) 的情况下可以到 1500 qps, golang 原生可以到 6000 qps…

      from torequests.crawlers import StressTest
             
      StressTest('http://localhost:8080', n=100, timeout=2).x
      # [2019-10-25 20:12:01] crawlers.py(485): [17496] response: 8389296c-2, start at 2019-10-25 20:11:38 (+00:00:23), 0.131s, 754.92 req/s [100.00 %]
      # [2019-10-25 20:12:01] crawlers.py(485): [17497] response: 8389296c-2, start at 2019-10-25 20:11:38 (+00:00:23), 0.131s, 754.96 req/s [100.00 %]
      # [2019-10-25 20:12:01] crawlers.py(485): [17498] response: 8389296c-2, start at 2019-10-25 20:11:38 (+00:00:23), 0.132s, 754.97 req/s [100.00 %]
      # [2019-10-25 20:12:01] crawlers.py(485): [17499] response: 8389296c-2, start at 2019-10-25 20:11:38 (+00:00:23), 0.132s, 755.02 req/s [100.00 %]
      # [2019-10-25 20:12:01] crawlers.py(485): [17500] response: 8389296c-2, start at 2019-10-25 20:11:38 (+00:00:23), 0.131s, 755.03 req/s [100.00 %]
      
  24. 净化一个复杂的请求, 只保留最简洁的参数

    1. 源代码有些脏, 就不展示了, 不是很优雅, 不过可以用

    2. 怕反爬就限制一下频率

    3. 实现原理就是得到全部 Request 参数, 然后一个个去掉, 如果去掉了某个导致 callback 结果 (默认是 md5 resp.content) 发生变化, 则该参数不是可忽略的参数

      from torequests.crawlers import CleanRequest
      request = '''curl 'https://p.3.cn?skuIds=1&nonsense=1&nonce=0' -H 'Pragma: no-cache' -H 'DNT: 1' -H 'Accept-Encoding: gzip, deflate' -H 'Accept-Language: zh-CN,zh;q=0.9' -H 'Upgrade-Insecure-Requests: 1' -H 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36' -H 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8' -H 'Cache-Control: no-cache' -H 'Referer: https://p.3.cn?skuIds=1&nonsense=1&nonce=0' -H 'Cookie: ASPSESSIONIDSQRRSADB=MLHDPOPCAMBDGPFGBEEJKLAF' -H 'Connection: keep-alive' --compressed'''
      

    c = CleanRequest(request) print(c.x)

    {‘url’: ‘https://p.3.cn’, ‘method’: ‘get’}

     
        
    
    

总结

大部分代码都是学习过程中, 遇到的一些常见问题做的练习, 活到老学到老.

一句话送给喜欢自学的人:

欲修其身者,先正其心;欲正其心者,先诚其意;欲诚其意者,先致其知,致知在格物.

至诚之道, 可以前知.