bio模块介绍
redis 源码中 bio.c 是一个重要的模块,它实现了后台 IO(background IO) 机制,多线程异步执行一些耗时的操作,避免阻塞主线程。
什么操作需要BIO
static char* bio_worker_title[] = {
"bio_close_file",
"bio_aof",
"bio_lazy_free",
};
AOF 刷盘操作
BIO_WORKER_AOF_FSYNC : aof 操作需要定期调用fsync和fdatasync,这类的系统调用比较慢。
关闭文件描述符
BIO_WORKER_CLOSE_FILE:一些客户端关闭链接后、停止 aof 操作的涉及 close文件描述符。
lazy free 机制
BIO_WORKER_LAZY_FREE:释放内存,例如:
- 删除key 的时候,主线程仅仅把 key 从字典中删除,将释放内存的工作交给BIO 线程
- 执行清理 db 命令如
flushdb async时将清理旧字典的工作交给 BIO 线程
完成任务 (COMP_RQ)
为了更好地实现后台任务的同步,Redis 引入了完成请求(COMP_RQ)任务类型。这些任务主要用来通知主线程,某个特定的后台任务已经执行完成。通过这些完成请求,主线程可以阻塞等待后台任务的完成,确保执行的顺序和一致性。
实现原理
每一种任务都对应着一个队列,用来存放待处理的任务,每次有新的任务,就会放到对应的队列中。
static list *bio_jobs[BIO_WORKER_NUM];
每一种任务都会启动一个专门的后台线程,不同任务互不影响
static pthread_t bio_threads[BIO_WORKER_NUM];
再看几个代表性的 API
bioInit
初始化后台线程和队列
/* Initialize the background system, spawning the thread. */
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
unsigned long j;
/* Initialization of state vars and objects */
for (j = 0; j < BIO_WORKER_NUM; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_newjob_cond[j],NULL);
bio_jobs[j] = listCreate();
}
/* init jobs comp responses */
bio_comp_list = listCreate();
pthread_mutex_init(&bio_mutex_comp, NULL);
/* Create a pipe for background thread to be able to wake up the redis main thread.
* Make the pipe non blocking. This is just a best effort aware mechanism
* and we do not want to block not in the read nor in the write half.
* Enable close-on-exec flag on pipes in case of the fork-exec system calls in
* sentinels or redis servers. */
if (anetPipe(job_comp_pipe, O_CLOEXEC|O_NONBLOCK, O_CLOEXEC|O_NONBLOCK) == -1) {
serverLog(LL_WARNING,
"Can't create the pipe for bio thread: %s", strerror(errno));
exit(1);
}
/* Register a readable event for the pipe used to awake the event loop on job completion */
if (aeCreateFileEvent(server.el, job_comp_pipe[0], AE_READABLE,
bioPipeReadJobCompList, NULL) == AE_ERR) {
serverPanic("Error registering the readable event for the bio pipe.");
}
/* Set the stack size as by default it may be small in some system */
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);
/* Ready to spawn our threads. We use the single argument the thread
* function accepts in order to pass the job ID the thread is
* responsible for. */
for (j = 0; j < BIO_WORKER_NUM; j++) {
void *arg = (void*)(unsigned long) j;
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
serverLog(LL_WARNING, "Fatal: Can't initialize Background Jobs. Error message: %s", strerror(errno));
exit(1);
}
bio_threads[j] = thread;
}
}
bioCreateXXXJob
创建某一任务,目前共有bioCreateLazyFreeJob、bioCreateCompRq、bioCreateCloseJob、bioCreateCloseAofJob、bioCreateFsyncJob 这六种。
以下面的bioCreateCloseJob 为例,创建了一个 job 并将其提交。
void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache) {
bio_job *job = zmalloc(sizeof(*job));
job->fd_args.fd = fd;
job->fd_args.need_fsync = need_fsync;
job->fd_args.need_reclaim_cache = need_reclaim_cache;
bioSubmitJob(BIO_CLOSE_FILE, job);
}
bioSubmitJob
加入到对应的队列中并唤醒线程。
void bioSubmitJob(int type, bio_job *job) {
job->header.type = type;
unsigned long worker = bio_job_to_worker[type];
pthread_mutex_lock(&bio_mutex[worker]);
listAddNodeTail(bio_jobs[worker],job);
bio_jobs_counter[type]++;
pthread_cond_signal(&bio_newjob_cond[worker]);
pthread_mutex_unlock(&bio_mutex[worker]);
}
bioProcessBackgroundJobs
后台线程的核心循环任务,主要工作是从队列中取出并执行
...
while(1) {
listNode *ln;
/* The loop always starts with the lock hold. */
if (listLength(bio_jobs[worker]) == 0) {
pthread_cond_wait(&bio_newjob_cond[worker], &bio_mutex[worker]);
continue;
}
/* Get the job from the queue. */
ln = listFirst(bio_jobs[worker]);
job = ln->value;
/* It is now possible to unlock the background system as we know have
* a stand alone job structure to process.*/
pthread_mutex_unlock(&bio_mutex[worker]);
/* Process the job accordingly to its type. */
int job_type = job->header.type;
if (job_type == BIO_CLOSE_FILE) {
if (job->fd_args.need_fsync &&
redis_fsync(job->fd_args.fd) == -1 &&
errno != EBADF && errno != EINVAL)
{
serverLog(LL_WARNING, "Fail to fsync the AOF file: %s",strerror(errno));
}
if (job->fd_args.need_reclaim_cache) {
if (reclaimFilePageCache(job->fd_args.fd, 0, 0) == -1) {
serverLog(LL_NOTICE,"Unable to reclaim page cache: %s", strerror(errno));
}
}
close(job->fd_args.fd);
}