快速清空超大数据表

作者:matrix 发布时间:2020年8月31日星期一 分类:Python 零零星星

第一次drop超过GB的数据表,没想到竟然会执行的这么慢。尝试过TRUNCATEDROP都不满意。
后来就直接找到数据库储存的文件来删除,这样比起使用sql语句操作会快得多,但也是危险操作,无法找回。

删除操作脚本

运行环境 python3.7,依赖pymysql,根据自身情况配置变量mysql_data_dir,db_config,table_names,condition_save

fast_drop_table.py

#codeing=utf-8
"""
快速清空超大数据表 保留想要数据
"""
import pymysql
import os

mysql_data_dir = '/mnt/mysql_data/db_name/' #数据库文件所在路径

# 数据库连接配置
db_config = {'host': '127.0.0.1', 'port': 3306, 'user': 'user', 'password': 'password', 'db': 'db_name', 'charset': 'utf8'}

# 需要清空操作的数据表
table_names = [
"com_hhtjim_badata_trades_eos_this_quarter",
"com_hhtjim_badata_trades_eth_this_quarter",
  ]

# 数据表保留的查询条件
condition_save = "timestamp  >  '2020-02-20T00:00:00Z'"
# condition_save = False# 不保留


class Db:
    '''
    简单数据库连接操作类
    '''
    def __init__(self,**kwargs):
        self.connection = pymysql.connect(**kwargs)
        self.cursor = self.connection.cursor()


if __name__ == "__main__":
  mysql = Db(**db_config)
  for table_name in table_names:
    os.link('{}{}.frm'.format(mysql_data_dir,table_name), '{}{}.frm.h'.format(mysql_data_dir,table_name))
    os.link('{}{}.ibd'.format(mysql_data_dir,table_name), '{}{}.ibd.h'.format(mysql_data_dir,table_name))


    mysql.cursor.execute('CREATE TABLE {0}_back like {0}'.format(table_name))
    mysql.connection.commit()


    if condition_save:
      mysql.cursor.execute("INSERT INTO {0}_back SELECT * FROM {0}  WHERE {1} ;".format(table_name,condition_save))
      mysql.connection.commit()


    mysql.cursor.execute("drop table {}".format(table_name))
    mysql.connection.commit()

    mysql.cursor.execute("alter table  {0}_back rename to  {0};".format(table_name))
    mysql.connection.commit()


    os.unlink('{}{}.frm.h'.format(mysql_data_dir,table_name))
    os.unlink('{}{}.ibd.h'.format(mysql_data_dir,table_name))

    print('succeed: {}'.format(table_name))

具体步骤

### 找到frm,ibd文件

根据数据库存储路径找到需要删除的表名的frm,ibd文件。

### 建立硬连接
$ ln mytable.ibd  mytable.ibd.h
$ ln mytable.frm  mytable.frm.h


### 备份表结构
CREATE TABLE mytable_back like mytable;

### 备份想要保留的数据
INSERT INTO mytable_back SELECT * FROM mytable  WHERE timestamp  >  '2020-02-27T00:00:00Z' ;

### 删除旧表
drop table mytable;

### 修改备份表名字
alter table  mytable_back rename to  mytable;


### 删除硬连接
$ rm -f  mytable.frm.h  mytable.ibd.h

参考:
https://blog.csdn.net/weixin_34034261/article/details/86250223

requests请求cookies本地持久化

作者:matrix 发布时间:2020年2月25日星期二 分类:Python

Python中单个地址进行请求我都是使用header的cookie中添加会话信息,简单干脆。但是多个域名跳转请求的时候就出现了问题,多域名的话需要按照域名host作为key来缓存,这岂不是很麻烦?

requests.Session()也很少使用,这次正好试试。requests.Session()可以作为全局变量来保存请求的cookies会话信息。这样在脚本的单次执行中可以很好的关联请求会话信息,避免要求多次登录的情况出现。

环境:Python 3.7

Session 单次会话

这里所谓单次会话其实就是单次运行脚本的一种效果
如果想要下次重新运行脚本依旧使用之前的cookies就必须要持久化处理。


import requests session = requests.Session() response1 = session.post('https://passport.baidu.com/v2/?login&tag=hhtjim.com') response2 = session.get('http://www.baidu.com/?tag=pang)

response1请求会返回Set-Cookie的响应头,Session会记录Set-Cookie的值然后在response2中携带Cookie的请求头。这些都是会话处理的效果,也就是requests自动完成。这样如果response1登录成功,则后续请求就可以直接进行,避免手动携带Cookie

Session本地持久化

现在需求是本地保存cookies信息,避免重新执行脚本的时候还要求登录。
本来没找到现成的方法只能自己序列化存储cookies数据,然后载入的时候反序列化就好了。但是后面看到http.cookiejar.MozillaCookieJa1这些函数可以处理目前的问题。

import requests,os
http.cookiejar import MozillaCookieJar


session = requests.Session() #作为全局变量使用

#载入cookies
path = 'cookies.txt' #设置cookies文件保存路径
s = MozillaCookieJar(path)
os.path.isfile(path) and s.load(path, ignore_discard=True, ignore_expires=True)#存在文件则载入
session.cookies = s #使用MozillaCookieJar进行会话管理   

response1 = session.post('https://passport.baidu.com/v2/?login&tag=hhtjim.com')

#触发保存会话到本地文件
session.cookies.save(ignore_discard=True, ignore_expires=True) 

response2 = session.get('http://www.baidu.com/?tag=pang)

上面操作就可以实现本地持久化存储,如果过期则会自动使用过期的Session请求续签。相对于单次会话其实就多了load和save操作,知道这基本原理也能够自己实现。
⚠️注意:
ignore_discard=True参数确保有开启,否则使用save方法不会保存到本地,load()处也是一致,避免无法读取。

如果想要清空会话使用clear()方法即可,再save()方法执行文件保存。

Note that the save() method won’t save session cookies anyway, unless you ask otherwise by passing a true ignore_discard argument.

参考:

https://stackoverflow.com/questions/13030095/how-to-save-requests-python-cookies-to-a-file

https://zhuanlan.zhihu.com/p/42950252


  1. CookieJar,LWPCookieJar都有实现save方法进行会话保存 

aiohttp + asyncio 异步网络请求基本操作

作者:matrix 发布时间:2019年11月26日星期二 分类:Python

asyncio异步操作需要关键字async,await
async用来声明方法函数,await用来声明耗时操作。
但是await关键字后面要求为awaitable对象 且只能在async方法内部使用,不能在外部入口中使用。asyncio的语法其实是系统内部实现了yield from协程。

aiohttp用来代替requests的请求库,且支持异步操作。
主要优点体现在并发请求多个耗时任务时,自动安排耗时时的操作,避免cpu等待一个一个请求。

单个请求操作

import aiohttp
import asyncio

#get 请求
async def get():
  async with aiohttp.request('GET','https://api.github.com/users/Ho',params={'arg1':123}) as response:
    # response.request_info # 请求信息
    return await response.json()

rel = asyncio.run(get())

# 或者使用下面方式 手动关闭异步事件循环
# loop = asyncio.get_event_loop()
# rel = loop.run_until_complete(get())
# loop.close()

print(rel)

多个并发请求操作

主要区别在于异步任务的添加操作,运行。

请求测试url:

http://link/await/1 # delay 1sec
http://link/await/2 # delay 2sec
...

请求测试:

import aiohttp
import asyncio

#get 请求
async def get():
  async with aiohttp.request('GET','http://link/await/1') as response:
    return await response.text()

# 所有请求任务
async def all_req():
#async with asyncio.Semaphore(5): 设置并发的连接数
# https://docs.python.org/zh-cn/3/library/asyncio-sync.html#asyncio.Semaphore

  task = []
  #添加请求任务
  for i in range(5):
    task.append(asyncio.create_task(get()))
  #create_task 方法等同于  ensure_future()方法
  #手册建议首选 create_task方法 
  # https://docs.python.org/zh-cn/3/library/asyncio-future.html?highlight=ensure_future#asyncio.ensure_future

  return await  asyncio.gather(*task)#传入参数 tuple类型 作为位置参数
  # 等同于 asyncio.gather(get(),get())
  # gather()方法用于收集所有任务完成的返回值,如果换成wait()方法会返回任务tuple对象,(done,pending)

rel = asyncio.run(all_req())
print(rel)

# 总共5个请求任务返回:
# 总耗时1秒多,相比同步的5秒+好N多。
"""
['sleep 1 second is done', 'sleep 1 second is done', 'sleep 1 second is done', 'sleep 1 second is done', 'sleep 1 second is done']

[Done] exited with code=0 in 1.955 seconds
"""

tell why??

测试发现Semaphore方法设置的请求并发数量跟本不起作用,nginx的access.log以及Proxifier看到的一次性请求量都不是代码中设置的数量。

使用uvloop优化异步操作

uvloop用于提升协程的速度。
uvloop使用很简单,直接设置异步策略就好了。

import asyncio
import uvloop

#声明使用 uvloop 事件循环
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

测试遇到很多报错,基本上都是await和async使用的问题。

异步请求的分块chunk并发控制

自行chunk操作
自己按照所有任务的list列表进行chunk切割,然后分块进行请求,每块中固定chunk数量的任务。基本可以实现想要的并发限制操作

async def _bulk_task(num,current_page = 1):
  """批量创建异步任务
  """
  task = []
  for i in range(num):# 每次10个连接并发进行请求
    task.append(asyncio.create_task(get(current_page)))
    current_page += 1
  return await asyncio.gather(*task) 

# 主要进行chunk操作的函数
def run_task(total,chunk,offset_start_page = 1):
    """运行分块处理的批量任务

    Arguments:
        total int 总请求数
        chunk int 每次并发请求数
        offset_start_page int 初始分块开始的页数(偏移页数),正常默认为1

    Yields:
        返回收集的异步任务运行结果
    """

    length = math.ceil(total/chunk)
    for i in range(length):
        start_page = i * chunk + offset_start_page # 当前分块开始的页数
        haldle_num = chunk# 当前需要并发处理的数量

        #处理结尾的块
        if i == length - 1:
            # print(':::',chunk,start_page + chunk - offset_start_page)
            haldle_num = min(chunk,total + offset_start_page - start_page)

        # print('当前分块下标:{},当前分块需要处理的总数:{},当前分块开始页数:{}'.format(i,haldle_num,start_page))
        rel = asyncio.run(_bulk_task(haldle_num,start_page))
        yield rel


rel  = run_task(123,10)# 123总任务 每10条并发请求
for i in rel:
  print(i)

TODO

参考:

https://hubertroy.gitbooks.io/aiohttp-chinese-documentation/content/aiohttp%E6%96%87%E6%A1%A3/ClientUsage.html#%E6%84%89%E5%BF%AB%E5%9C%B0%E7%BB%93%E6%9D%9F

https://docs.Python.org/zh-cn/3/library/asyncio-eventloop.html#asyncio.get_running_loop

https://segmentfault.com/q/1010000008663962

http://www.ruanyifeng.com/blog/2019/11/Python-asyncio.html

https://blog.csdn.net/qq_37144341/article/details/89471603

https://www.jianshu.com/p/8f65e50f39b4

python 快速读取压缩包内文件

作者:matrix 发布时间:2019年10月14日星期一 分类:Python

搜索结果一大堆但都没有找到支持url和local path两种读取方式的操作。
留着便于以后直接使用。

gits: https://gist.github.com/Hootrix/cf3e75b1fa6d3d404bc99787f89687f1


import requests,tempfile, zipfile,os def read_file_for_zip(zip_url, callback=None): """ 读取zip包内的文件 :param zip_url:zip路径/url :param callback:读取操作的回调函数 若函数返回false 则不会读取下一个文件 :return: """ with tempfile.TemporaryFile('w+b') as tmpfile: # 生成临时文件 # 判断是否为本地文件 if os.path.isfile(zip_url): #进行本地复制。没必要 # with open(zip_url,'rb') as f: # while True: # chunk = f.read(1024) # if not chunk: # break # tmpfile.write(chunk) tmpfile = zip_url else:#进行http请求 r = requests.get(zip_url, stream=True) for chunk in r.iter_content(chunk_size=1024): if chunk: tmpfile.write(chunk) assert zipfile.is_zipfile(tmpfile), '不是zip文件' zf = zipfile.ZipFile(tmpfile) for name in zf.namelist(): # list e.g. ['Brave Browser.url', 'Express VPN.url', 'ssl.txt', 'What is my IP.url'] if callable(callback): # zf.read(name) #读取 if callback(name, zf) is False:# 函数返回false 会终止下一个文件的读取 break ### 例子 def cb(filename,context): if filename.endswith('.txt'): print(context.read(filename).decode('utf-8')) # print( context.read(filename)) return False #终止下一个文件的读取 read_file_for_zip('https://cdn-01.openload.cc/S9Y7m488n8/22c3c58b-1571037628/ssl_proxies.zip',cb)

具体使用见上面例子
兼容大文件url的下载处理

p.s.
在线压缩包读取:
https://extract.me/cn/

参考:

http://www.liujiangblog.com/course/Python/62

https://docs.python.org/2/library/tempfile.html

ISO8601时间字符串到时间戳处理

作者:matrix 发布时间:2019年6月21日星期五 分类:Python 兼容并蓄

之前不太理解ISO8601时间格式,后来看了下网上文章,其实是没有固定的单一格式。
按照下面这些其实都属于ISO8601时间格式:

2019-03-25T16:00:00.000111Z
2019-03-25T16:00:00.111Z
2019-03-25T16:00:00Z
2019-03-25T16:00:00
...

Z表示祖鲁时间Zulu time+0时区,若去掉不写Z则采用系统本地时区。
ISO8601时间还有很多其他扩展格式。

下面代码处理的也就是普通格式

python

import datetime,pytz
def iso2timestamp(datestring, format='%Y-%m-%dT%H:%M:%S.%fZ',timespec='seconds'):
    """
    ISO8601时间转换为时间戳

    :param datestring:iso时间字符串 2019-03-25T16:00:00.000Z,2019-03-25T16:00:00.000111Z
    :param format:%Y-%m-%dT%H:%M:%S.%fZ;其中%f 表示毫秒或者微秒
    :param timespec:返回时间戳最小单位 seconds 秒,milliseconds 毫秒,microseconds 微秒
    :return:时间戳 默认单位秒
    """
    tz = pytz.timezone('Asia/Shanghai')
    utc_time = datetime.datetime.strptime(datestring, format)  # 将字符串读取为 时间 class datetime.datetime

    time = utc_time.replace(tzinfo=pytz.utc).astimezone(tz)

    times = {
        'seconds': int(time.timestamp()),
        'milliseconds': round(time.timestamp() * 1000),
        'microseconds': round(time.timestamp() * 1000 * 1000),
    }
    return times[timespec]


def timestamp2iso(timestamp, format='%Y-%m-%dT%H:%M:%S.%fZ'):
    """
    时间戳转换到ISO8601标准时间(支持微秒级输出 YYYY-MM-DD HH:MM:SS.mmmmmm)
    :param timestamp:时间戳,支持 秒,毫秒,微秒级别
    :param format:输出的时间格式  默认 iso=%Y-%m-%dT%H:%M:%S.%fZ;其中%f表示微秒6位长度

    此函数特殊处理,毫秒/微秒部分 让其支持该部分的字符格式输出
    :return:
    """
    format = format.replace('%f','{-FF-}')#订单处理微秒数据 %f
    length = min(16, len(str(timestamp)))#最多去到微秒级

    #获取毫秒/微秒 数据
    sec = '0'
    if length != 10:#非秒级
        sec = str(timestamp)[:16][-(length - 10):]#最长截取16位长度 再取最后毫秒/微秒数据
    sec = '{:0<6}'.format(sec)#长度位6,靠左剩下的用0补齐
    timestamp = float(str(timestamp)[:10])#转换为秒级时间戳
    return datetime.datetime.utcfromtimestamp(timestamp).strftime(format).replace('{-FF-}',sec)

说明:
之前别个写的iso到时间戳的转换方法简直蛋疼,无参数说明和无法精确到秒级别。
两个函数都可以相互转换和处理。

参考:

https://en.wikipedia.org/wiki/ISO_8601
https://docs.Python.org/zh-cn/3.7/library/datetime.html?highlight=isoformat#strftime-strptime-behavior
https://www.w3.org/TR/NOTE-datetime
https://www.cryptosys.net/pki/manpki/pki_iso8601datetime.html

Python递归中使用协程yield

作者:matrix 发布时间:2019年6月11日星期二 分类:Python

修改递归函数用于遍历目录中文件。

普通操作

def recursive_open_file(path):
    rel = []
    path_dir = os.listdir(path)  # 获取当前路径下的文件名,返回List
    for s in path_dir:
        new_dir = os.path.join(path, s)  # 将文件命加入到当前文件路径后面
        if os.path.isfile(new_dir):  # 如果是文件
            if os.path.splitext(new_dir)[1] == ".txt":  # 判断是否是txt
                rel.append(new_dir)
        else:
            rel = rel + recursive_open_file(new_dir)
    return rel

# print(type(recursive_open_file(dir))) # <class 'list'>

调试yield

想用协程目的是为了想让程序找到相关文件之后中断挂起然后返回数据,避免一次性加载全部资源在内存中。

之前是想的太简单,没用过果真是不知道。

错误版本

def recursive_open_file(path):
    path_dir = os.listdir(path)  # 获取当前路径下的文件名,返回List
    for s in path_dir:
        new_dir = os.path.join(path, s)  # 将文件命加入到当前文件路径后面
        if os.path.isfile(new_dir):  # 如果是文件
            if os.path.splitext(new_dir)[1] == ".txt":  # 判断是否是txt
                yield new_dir
        else:
            yield recursive_open_file(new_dir)

#执行
for i in recursive_open_file(dir):
    print(i) #无法获取文件路径

说明:
yield recursive_open_file(new_dir)返回给外部调用层的数据为<generator object recursive_open_file at 0x10f7765e8> ,不是想要的String!!

正常版本

内部再迭代下就好了 🙈

def recursive_open_file(path):
    path_dir = os.listdir(path)  # 获取当前路径下的文件名,返回List
    for s in path_dir:
        new_dir = os.path.join(path, s)  # 将文件命加入到当前文件路径后面
        if os.path.isfile(new_dir):  # 如果是文件
            if os.path.splitext(new_dir)[1] == ".txt":  # 判断是否是txt
                yield new_dir
        else:
            for i in recursive_open_file(new_dir):
                yield i

PHP协程递归同理


function openDirectory($path) { $dir = dir($path); while (false != ($entry = $dir->read())) { if ($entry != "." && $entry != "..") { $n_path = $path . DIRECTORY_SEPARATOR . $entry; if (is_dir($n_path)) { foreach (openDirectory($n_path) as $i){ yield $i; } } else { yield $n_path; } } } } //调用执行 $dir = '/Users/panc/Desktop/Python/testfile'; foreach (openDirectory($dir) as $item){ print_r($item); print_r("\n"); }

协程send操作

按照廖雪峰的协程教程的生产者-消费者模式:

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    c.send(None)#启动
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()#关闭

c = consumer()
produce(c)

笔记:
c = consumer()不会执行consumer函数,因为内部有yield关键字,consumer函数是生成器generator对象。
通过多个断点调适可以看到yield处的代码会中断执行,然后切换到起调函数的位置继续执行
yield r相似于return返回数据,返回给send()方法返回值。
send(n)操作是把n发送给yield r的返回值
c.send(None)用于启动consumer函数,程序会进入while True循环,在yield处中断

这里yield操作相比较递归遍历那头来说更加麻烦些,因为执行的时候会在两个函数之间相互切换,互相发送数据,需要send方法来启动生成器generator对象。consumer内部因为是while true,所以记得要关闭c.close()

参考:
https://www.liaoxuefeng.com/wiki/1016959663602400/1017968846697824
https://github.com/Earthson/RecGen
https://blog.csdn.net/mieleizhi0522/article/details/82142856

https://blog.51cto.com/xtceetg/1874982

PEACE~

pycharm+pipenv虚拟环境作开发和依赖管理

作者:matrix 发布时间:2019年5月23日星期四 分类:Python

之前使用vagrant来在虚拟机环境中把所有Python模块安装,但是也有一些问题。比如个别时候连接到vagrant虚拟机内部比较慢,还有就是pip3 install安装模块有时候会失败,不能更好的管理依赖包分发项目。

每个项目模块分开管理安装,不会污染本地系统的全局环境,测试和生产的模块都可以用这个来管理分发。

pipenv会在项目中创建相关联的虚拟环境信息以及依赖信息pipfile文件,一起开发的同事可以执行pipenv install操作来安装以及初始化好的pipenv项目,系统会默认安装所需要的依赖环境。

测试环境:pycharm 2018.3.5 for MacOS

安装pipenv

$ pip3 install pipenv

初始化项目

项目目录中执行操作

$ pipenv --python 3.7#指定项目运行python版本 需要系统已经安装好该版本

会创建Python虚拟环境所在目录

✔ Successfully created virtual environment!
Virtualenv location: /Users/用户名/.local/share/virtualenvs/untitled-RaU-esYo

查看虚拟环境信息:

$ pipenv --venv

安装模块

$ pipenv install requests
#也可以指定安装版本 :pipenv install requests==2.22

指定开发环境才安装的模块

$ pipenv install --dev nose2#安装测试环境才用的开发包

项目克隆

项目需要让其他同事克隆就可以直接install来操作

$ pipenv install
#或者使用 pipenv install –dev#安装所有依赖项,包括测试开发包

pycharm配置

1.添加python解释器

2.选择Pipenv 虚拟环境
Base interpreter为本机系统中的python解释器路径
Pipenv executable表示pipenv,命令的环境变量路径

 

3.之后在项目下选择刚刚新建好的Pipenv名称

 

测试运行

配置好pipenv后直接点击右上角的运行图标即可看到命令行窗口的显示

pipenv命令收集

pipenv shell #进入虚拟环境命令行
pipenv --venv#查看虚拟环境路径
pipenv --py#获取虚拟环境python解释器路径
pipenv graph#查看依赖关系
pipenv run python Main.py#使用Pipenv虚拟环境运行Main.py

报错

  • 克隆pipenv环境的时候报错OSError: mysql_config not found
    centos下确保安装mysql-devel等依赖环境
$ yum install mysql-devel gcc gcc-devel python-devel

如果已经安装则考虑是否为软连接不存在导致的not found

ln -s /usr/local/mysql/bin/mysql_config /usr/local/bin/mysql_config

参考:
一条命令解决mysql_config not found
https://blog.csdn.net/weiliu0626/article/details/8444644

  • 若debug调试失败显示类似的红色进程报错
pydev debugger: process 37807 is connecting

删掉项目中隐藏的的.idea目录,然后重启pycharm即可调试变量

参考:
https://www.jianshu.com/p/00af447f0005

https://www.jetbrains.com/help/pycharm/pipenv.html

https://intellij-support.jetbrains.com/hc/en-us/community/posts/205967904-pydev-debugger-is-not-connecting

https://blog.51cto.com/2681882/2164134

Django模板引擎中变量作为属性值调用

作者:matrix 发布时间:2019年5月16日星期四 分类:Python 零零星星

Django默认模板中如果想要调用变量的属性值只能点.字符,也就是value.arg,等同于value["arg"]的调用,不能使用方括号的语法操作,这样的就会导致无法取value[arg]这种变量属性值。

解决办法

1.更换模版引擎。
2.使用自定义template filters模版过滤器

使用模版过滤器setup

1.创建templatetags目录

在项目的模块目录下创建templatetags文件夹(和路由配置urls.py同级)
目录下需要创建__init__.py空白文件

2.创建过滤器py文件

自定义过滤器:dict_value
val.py

from django import template
register = template.Library()

@register.filter(name='dict_value')
def dict_value(value, arg):
    """
    模版变量过滤器 用于调用变量属性的值
    如:$value[$arg]
    :param value:
    :param arg:
    :return:
    """
    return value[arg]

3.模版中使用

模版中需要load操作:

{% load val %} #载入过滤器文件val.py
{{ params_data|dict_value:item|dict_value:'title' }}#使用过滤器

如上调用形式等同于:
params_data[item]['title']

参考:
https://www.v2ex.com/t/66772
https://docs.Djangoproject.com/en/dev/howto/custom-template-tags/
https://blog.csdn.net/lagelangzhi/article/details/54620061

PEACE~