python中的阻塞函数 python ospopen阻塞

python - 日志记录模块(logging)的二次封装上篇文章 对logging做了基本介绍,我们可以使用logging来做日志的简单记录 。但实际项目应用时,我们一般会根据自身需要对其做二次封装(loggingV2),然后在其他python文件中, 先import申明后直接调用 。
废话不多说,下面给几个二次封装的简单示例:
示例一:
loggingV2.py - 封装
logMain.py - 应用
示例二:
对上述示例进行 模块化封装 ,如下log.py
则任何声明了log模块的python文件都可以调用logging日志系统,如下logMain.py
示例三:
对上述示例进行 定制化封装 ,如下myLog.py
需求:
1)同时实现终端显示与日志文件保存
2)日志文件名除日期外,增加显示时间,精确到秒
3)日志输出级别可配置
4)日志保存路径与文件名可配置
5)日志跨天(或者小时/分钟),另生成新文件保存
改写logMain.py,如下:
示例四:
对上述示例进行 异步线程封装,如下myThreadLog.py
需求:
1)独立线程处理日志,不影响主程序性能
2)使用队列异步处理日志记录
继续改写logMain.py , 如下:
注意 - 线程相关操作函数(如下):
1.threading.Thread() — 创建线程并初始化线程,可以为线程传递参数
2.threading.enumerate() — 返回一个包含正在运行的线程的list
3.threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果
4.Thread.start() — 启动线程
5.Thread.join() — 阻塞函数 , 一直等到线程结束
6.Thread.isAlive() — 返回线程活动状态
7.Thread.setName() — 设置线程名
8.Thread.getName() — 获取线程名
9.Thread.setDaemon() — 设置为后台线程,这里默认是False,设置为True之后则主线程不会再等待子线程结束才结束,而是主线程结束意味程序退出,子线程也立即结束,注意调用时必须设置在start()之前;
10.除了以上常用函数,线程还经常与互斥锁Lock/事件Event/信号量Condition/队列Queue等函数配合使用
提高pycharm线程数多线程是指在一个进程中,允许几段代码并发式的同时运行 。Python 的多线程运算就是利用了这一点,可以让代码的运行方法更加丰富有效 。这里需要用到的一个库叫 Threading , 这个库可以直接调用其中的函数 , 或者通过继承类来实现,下面我们来分别通过这两个方法来对运算进行提速 。函数多线程
import threading
def func(times, name, ret):
for i in range(times):
print(name' run: 'str(i))
ret[name] = name" finished with "str(times)" times printed"
【python中的阻塞函数 python ospopen阻塞】return
if __name__ == '__main__':
thread_pool = []
ret = {}
th_1 = threading.Thread(target=func, args=[3, 'th_1', ret], name='th_1')
th_2 = threading.Thread(target=func, args=[5, 'th_2', ret], name='th_2')
thread_pool.append(th_1)
thread_pool.append(th_2)
for th in thread_pool:
th.start()
for th in thread_pool:
th.join()
print(ret)
这里我们首先定义了一个函数,func , 并且运用 threading 库来做多线程式的函数调用 。随后在主程序 main() 内部 , 我们引入了几个多线程运算的核心概念:线程池 Thread Pool: 线程池是管理多线程的一个容器,帮助分配资源,完成线程的生命周期 。你可以自己创建一个容器来管理,或者调用其他容器来管理 。这里的线程池是一个列表 List(), 给予变量 thread_pool 。
线程类 Thread Class: threading.Thread 是一个线程库里的线程类 , 帮助你管理,运行目标函数 。这里的第一个参数 target= 是目标函数,如果你阅读了 多多教Python:Python 基本功: 7. 介绍函数,就会了解函数也可以当做参数被传递;第二个参数是目标函数的参数,这里是一个列表;第三个参数是线程类的名字 , 在你自己定义了之后可以在之后管理线程类的时候用到 。
开始 Start: 在创建了线程类之后,你需要通过 start() 函数来开始运行你的目标函数 。这里通过对线程池列表的循环来一一启动其目标函数 。
非阻塞 Non-Blocking: 线程类的 start() 函数是一个非阻塞函数,意思是当目标函数启动了之后,程序就返回了,而不是等到目标函数结束再返回 。所以这里可以实现对线程池的线程同时启动,而不是等待前一个结束再开始下一个 。
加入 Join: 第二次循环线程池列表的时候,调用了线程类的 join() 函数 。join() 函数会等待目标函数运行结束之后返回,是一个阻塞 (Blocking) 函数 。目前教程介绍的都是阻塞型函数 , 在之后的教程中讲到 异步(Asynchronous)的时候会了解如何写非阻塞函数 。
返回值 Return:如果你想在多线程运算中获得返回值 , 有不同的办法,这里介绍其中一种:利用传入的参数来保存返回值 。这里在参数列表里传入了一个字典,每一个目标函数把自己的返回值写入这个字典 。在 join() 返回之后,就可以查看字典里的返回数值 。
Python中socket里的.recv()函数问题可以通过setsockopt,或者更简单的setblocking,
settimeout设置 。阻塞式的socket的recv服从这样的规则:
当缓冲区内有数据时,立即返回所有的数据;当缓冲区内无数据时,阻塞直到缓冲区中有数据 。非阻塞式的socket的recv服从的规则则是:
当缓冲区内有数据时,立即返回所有的数据;当缓冲区内无数据时 , 产生EAGAIN的错误并返回(在Python中会抛出一个异常) 。两种情况都不会返回空字符串,返回空数据的结果是对方关闭了连接之后才会出现的 。
python2.7怎么实现异步改进之前
之前,我的查询步骤很简单 , 就是:
前端提交查询请求 -- 建立数据库连接 -- 新建游标 -- 执行命令 -- 接受结果 -- 关闭游标、连接
这几大步骤的顺序执行 。
这里面当然问题很大:
建立数据库连接实际上就是新建一个套接字 。这是进程间通信的几种方法里,开销最大的了 。
在“执行命令”和“接受结果”两个步骤中,线程在阻塞在数据库内部的运行过程中 , 数据库连接和游标都处于闲置状态 。
这样一来,每一次查询都要顺序的新建数据库连接 , 都要阻塞在数据库返回结果的过程中 。当前端提交大量查询请求时,查询效率肯定是很低的 。
第一次改进
之前的模块里,问题最大的就是第一步——建立数据库连接套接字了 。如果能够一次性建立连接 , 之后查询能够反复服用这个连接就好了 。
所以,首先应该把数据库查询模块作为一个单独的守护进程去执行 , 而前端app作为主进程响应用户的点击操作 。那么两条进程怎么传递消息呢?翻了几天Python文档,终于构思出来:用队列queue作为生产者(web前端)向消费者(数据库后端)传递任务的渠道 。生产者,会与SQL命令一起,同时传递一个管道pipe的连接对象 , 作为任务完成后,回传结果的渠道 。确保,任务的接收方与发送方保持一致 。
作为第二个问题的解决方法,可以使用线程池来并发获取任务队列中的task,然后执行命令并回传结果 。
第二次改进
第一次改进的效果还是很明显的 , 不用任何测试手段 。直接点击页面链接,可以很直观地感觉到反应速度有很明显的加快 。
但是对于第二个问题,使用线程池还是有些欠妥当 。因为,CPython解释器存在GIL问题,所有线程实际上都在一个解释器进程里调度 。线程稍微开多一点,解释器进程就会频繁的切换线程,而线程切换的开销也不小 。线程多一点 , 甚至会出现“抖动”问题(也就是刚刚唤醒一个线程,就进入挂起状态 , 刚刚换到栈帧或内存的上下文,又被换回内存或者磁盘) , 效率大大降低 。也就是说,线程池的并发量很有限 。
试过了多进程、多线程,只能在单个线程里做文章了 。
Python中的asyncio库
Python里有大量的协程库可以实现单线程内的并发操作,比如Twisted、Gevent等等 。Python官方在3.5版本里提供了asyncio库同样可以实现协程并发 。asyncio库大大降低了Python中协程的实现难度,就像定义普通函数那样就可以了 , 只是要在def前面多加一个async关键词 。async def函数中,需要阻塞在其他async def函数的位置前面可以加上await关键词 。
import asyncio
async def wait():
await asyncio.sleep(2)
async def execute(task):
process_task(task)
await wait()
continue_job()
async def函数的执行稍微麻烦点 。需要首先获取一个loop对象,然后由这个对象代为执行async def函数 。
loop = asyncio.get_event_loop()
loop.run_until_complete(execute(task))
loop.close()
loop在执行execute(task)函数时,如果遇到await关键字,就会暂时挂起当前协程,转而去执行其他阻塞在await关键词的协程,从而实现协程并发 。
不过需要注意的是,run_until_complete()函数本身是一个阻塞函数 。也就是说 , 当前线程会等候一个run_until_complete()函数执行完毕之后,才会继续执行下一部函数 。所以下面这段代码并不能并发执行 。
for task in task_list:
loop.run_until_complete(task)
对与这个问题,asyncio库也有相应的解决方案:gather函数 。
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(execute(task))
for task in task_list]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
当然了 , async def函数的执行并不只有这两种解决方案,还有call_soon与run_forever的配合执行等等,更多内容还请参考官方文档 。
Python下的I/O多路复用
协程,实际上,也存在上下文切换,只不过开销很轻微 。而I/O多路复用则完全不存在这个问题 。
目前,Linux上比较火的I/O多路复用API要算epoll了 。Tornado , 就是通过调用C语言封装的epoll库,成功解决了C10K问题(当然还有Pypy的功劳) 。
在Linux里查文档 , 可以看到epoll只有三类函数,调用起来比较方便易懂 。
创建epoll对象 , 并返回其对应的文件描述符(file descriptor) 。
int epoll_create(int size);
int epoll_create1(int flags);
控制监听事件 。第一个参数epfd就对应于前面命令创建的epoll对象的文件描述符;第二个参数表示该命令要执行的动作:监听事件的新增、修改或者删除;第三个参数,是要监听的文件对应的描述符;第四个,代表要监听的事件 。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
等候 。这是一个阻塞函数,调用者会等候内核通知所注册的事件被触发 。
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events,
int maxevents, int timeout,
const sigset_t *sigmask);
在Python的select库里:
select.epoll()对应于第一类创建函数;
epoll.register() , epoll.unregister(),epoll.modify()均是对控制函数epoll_ctl的封装;
epoll.poll()则是对等候函数epoll_wait的封装 。
Python里epoll相关API的最大问题应该是在epoll.poll() 。相比于其所封装的epoll_wait,用户无法手动指定要等候的事件,也就是后者的第二个参数struct epoll_event *events 。没法实现精确控制 。因此只能使用替代方案:select.select()函数 。
根据Python官方文档 , select.select(rlist, wlist, xlist[, timeout])是对Unix系统中select函数的直接调用,与C语言API的传参很接近 。前三个参数都是列表,其中的元素都是要注册到内核的文件描述符 。如果想用自定义类 , 就要确保实现了fileno()方法 。
其分别对应于:
rlist: 等候直到可读
wlist: 等候直到可写
xlist: 等候直到异常 。这个异常的定义,要查看系统文档 。
select.select(),类似于epoll.poll() , 先注册文件和事件,然后保持等候内核通知,是阻塞函数 。
实际应用
Psycopg2库支持对异步和协程 , 但和一般情况下的用法略有区别 。普通数据库连接支持不同线程中的不同游标并发查询;而异步连接则不支持不同游标的同时查询 。所以异步连接的不同游标之间必须使用I/O复用方法来协调调度 。
所以,我的大致实现思路是这样的:首先并发执行大量协程,从任务队列中提取任务 , 再向连接池请求连接,创建游标,然后执行命令 , 并返回结果 。在获取游标和接受查询结果之前,均要阻塞等候内核通知连接可用 。
其中,连接池返回连接时 , 会根据引用连接的协程数量,返回负载最轻的连接 。这也是自己定义AsyncConnectionPool类的目的 。
我的代码位于:bottle-blog/dbservice.py
存在问题
当然了,这个流程目前还一些问题 。
首先就是每次轮询拿到任务之后,都会走这么一个流程 。
获取连接 -- 新建游标 -- 执行任务 -- 关闭游标 -- 取消连接引用
本来,最好的情况应该是:在轮询之前,就建好游标;在轮询时,直接等候内核通知,执行相应任务 。这样可以减少轮询时的任务量 。但是如果协程提前对应好连接,那就不能保证在获取任务时,保持各连接负载均衡了 。
所以这一块 , 还有工作要做 。
还有就是epoll没能用上,有些遗憾 。
以后打算写点C语言的内容,或者用Python/C API,或者用Ctypes包装共享库,来实现epoll的调用 。
最后,请允许我吐槽一下Python的epoll相关文档:简直太弱了?。。”匦肟丛绰氩拍芘宄δ?。
关于python中的阻塞函数和python ospopen阻塞的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站 。

    推荐阅读