关注微信公众号:Linux内核拾遗
前面介绍了Linux Workqueue的其中一种实现方式——使用全局工作队列。在这种实现方式下,开发者无需创建任何workqueue或者worker线程,只需要初始化任务(work)即可。
本文介绍另一种Linux Workqueue的实现方式,即创建自定义的工作队列。
1 struct workqueue_struct
struct workqueue_struct
是Linux内核中用于表示工作队列的结构体。工作队列允许将需要延迟执行的工作从中断上下文移到进程上下文中执行,这样可以避免在中断上下文中进行复杂和耗时的操作。
struct workqueue_struct
的定义在内核源代码中的 include/linux/workqueue.h
文件中。以下是简化的结构体定义:
struct workqueue_struct {
struct list_headlist;/* PR: list of all workqueues */
char name[WQ_NAME_LEN]; /* I: workqueue name */
struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */
...
};
- list:用于将多个工作队列链接在一起。
- name:工作队列的名称。
- cpu_wq:指向每个CPU的工作队列结构的指针。工作队列通常是按CPU分布的,以便于在多处理器系统上并行执行。
另外,struct work_struct
表示工作/任务本身,它需要被添加到工作队列struct workqueue_struct
中才会被调度和执行。
2 创建/销毁工作队列
可以使用create_workqueue宏来创建一个workqueue,结果返回struct workqueue_struct
的引用,随后可以使用destroy_workqueue宏来销毁该workqueue:
struct workqueue_struct *create_workqueue( name );
void destroy_workqueue( struct workqueue_struct * );
此外,可以使用create_singlethread_workqueue函数来创建一个单线程的Workqueue,其中的所有工作项将在同一个内核线程中执行,这非常适合于需要按顺序执行任务的情况。
create_workqueue和create_singlethread_workqueue这两个宏底层都是调用alloc_workqueue这个函数。
#define create_workqueue(name)
alloc_workqueue("%s", WQ_MEM_RECLAIM, 1, (name))
#define create_singlethread_workqueue(name)
alloc_workqueue("%s", WQ_UNBOUND | WQ_MEM_RECLAIM, 1, (name))
struct workqueue_struct *alloc_workqueue(const char *fmt,
unsigned int flags,
int max_active, ...);
其中:
- fmt:创建工作队列的printf格式的名称。
- flags:WQ_*参数,用于指定工作队列的行为和属性。
- max_active:最大同时处理的工作项的数量。
下面是一些常用的WQ_*参数:
-
WQ_UNBOUND:工作队列中的工作项由不绑定到任何特定CPU的特殊工作池(unbound worker-pool)处理,该工作队列就像一个简单的执行上下文提供者,没有并发管理,它会尽可能快地启动对工作项的处理。Unbound workqueue牺牲了局部性,但是适合于以下的场景:
- 并发程度波动较大的场景,因为使用Bound workqueue可能会在不同的CPU上创建大量很少使用的工作线程。
- 长时间运行的CPU密集型工作负载,这些负载可以由系统调度程序更好地管理。
- WQ_FREEZABLE:在系统挂起(比如待机或休眠)时,这种工作队列会停止接收新任务,并等待当前的任务完成;在系统恢复时,再继续处理新任务。
- WQ_MEM_RECLAIM:这种工作队列在系统内存非常紧张的时候仍然能够保证有线程执行任务,适合于在内存不足时也要执行的任务,例如内存回收操作。
- WQ_HIGHPRI:高优先级工作队列中的工作项被排入目标CPU的高优先级工作池,并且由具有较高nice级别的工作线程处理。正常和高优先级工作池不会互相影响,因为每个工作池维护其独立的工作线程池,并在其工作线程之间实现并发管理。
- WQ_CPU_INTENSIVE:CPU密集型任务不会妨碍其他任务的执行,确保所有任务都能被调度执行。
3 往工作队列添加工作项
工作项struct work_struct
初始化完成后,接下来就是要将其添加到工作队列中。Linux内核提供了以下几种方法。
3.1 queue_work
queue_work保证工作队列的任务会尽量在提交任务的那个CPU上执行,以保持任务的局部性(locality)。但是,如果那个CPU不可用(比如宕机或被系统禁用),任务会由其他CPU处理。这种机制可以确保任务不会因为某个CPU不可用而无法执行。
int queue_work( struct workqueue_struct *wq, struct work_struct *work );
3.2 queue_work_on
queue_work_on会将当前任务提交到特定的CPU上执行。
int queue_work_on( int cpu, struct workqueue_struct *wq, struct work_struct *work );
3.3 queue_delayed_work
queue_delayed_work会在任务提交到工作队列之前等待一段时间,由delay参数来指定。
]int queue_delayed_work( struct workqueue_struct *wq,
struct delayed_work *dwork, unsigned long delay );
3.4 queue_delayed_work_on
queue_delayed_work会在任务提交到特定的CPU上执行之前等待一段时间,由delay参数来指定。
int queue_delayed_work_on( int cpu, struct workqueue_struct *wq,
struct delayed_work *dwork, unsigned long delay );
4 完整代码演示
kernel_driver.c
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/module.h>
#include <linux/kdev_t.h>
#include <linux/fs.h>
#include <linux/cdev.h>
#include <linux/device.h>
#include <linux/slab.h>
#include <linux/uaccess.h>
#include <linux/sysfs.h>
#include <linux/kobject.h>
#include <linux/interrupt.h>
#include <asm/io.h>
#include <linux/err.h>
#include <linux/smp.h>
#define IRQ_NO 63
static struct workqueue_struct *own_workqueue;
static void static_wq_fn(struct work_struct *work)
{
pr_info("Static workqueue function called on CPU[%d]\n", smp_processor_id());
}
static DECLARE_WORK(static_work, static_wq_fn);
static void dynanic_wq_fn(struct work_struct *work)
{
pr_info("Dynamic workqueue function called on CPU[%d]\n", smp_processor_id());
}
static struct work_struct dynamic_work;
static irqreturn_t irq_handler(int irq, void *dev_id)
{
pr_info("Shared IRQ[%d]: Interrupt Occurred\n", irq);
queue_work(own_workqueue, &static_work);
queue_work_on(3, own_workqueue, &dynamic_work);
return IRQ_HANDLED;
}
volatile int my_value = 0;
dev_t dev = 0;
static struct class *dev_class;
static struct cdev my_cdev;
struct kobject *kobj_ref;
static ssize_t sysfs_show(struct kobject *kobj,
struct kobj_attribute *attr, char *buf)
{
return sprintf(buf, "%d", my_value);
}
static ssize_t sysfs_store(struct kobject *kobj,
struct kobj_attribute *attr, const char *buf, size_t count)
{
sscanf(buf, "%d", &my_value);
return count;
}
struct kobj_attribute my_attr = __ATTR(my_value, 0660, sysfs_show, sysfs_store);
static ssize_t my_read(struct file *filp,
char __user *buf, size_t len, loff_t *off)
{
generic_handle_irq(IRQ_NO);
return 0;
}
static struct file_operations fops = {
.owner = THIS_MODULE,
.read = my_read,
};
static int __init my_driver_init(void)
{
if ((alloc_chrdev_region(&dev, 0, 1, "my_dev")) < 0)
return -1;
cdev_init(&my_cdev, &fops);
my_cdev.owner = THIS_MODULE;
my_cdev.ops = &fops;
if ((cdev_add(&my_cdev, dev, 1)) < 0)
goto r_class;
if (IS_ERR(dev_class = class_create(THIS_MODULE, "my_class")))
goto r_class;
if (IS_ERR(device_create(dev_class, NULL, dev, NULL, "my_device")))
goto r_device;
kobj_ref = kobject_create_and_add("my_sysfs", kernel_kobj);
if (sysfs_create_file(kobj_ref, &my_attr.attr))
goto r_sysfs;
if (request_irq(IRQ_NO, irq_handler, IRQF_SHARED,
"my_device", (void *)(irq_handler)))
goto irq;
INIT_WORK(&dynamic_work, dynanic_wq_fn);
own_workqueue = create_workqueue("own_wq");
return 0;
irq:
free_irq(IRQ_NO, (void *)(irq_handler));
r_sysfs:
kobject_put(kobj_ref);
sysfs_remove_file(kernel_kobj, &my_attr.attr);
r_device:
device_destroy(dev_class, dev);
class_destroy(dev_class);
r_class:
unregister_chrdev_region(dev, 1);
cdev_del(&my_cdev);
return -1;
}
static void __exit my_driver_exit(void)
{
destroy_workqueue(own_workqueue);
free_irq(IRQ_NO, (void *)(irq_handler));
kobject_put(kobj_ref);
sysfs_remove_file(kernel_kobj, &my_attr.attr);
device_destroy(dev_class, dev);
class_destroy(dev_class);
cdev_del(&my_cdev);
unregister_chrdev_region(dev, 1);
}
module_init(my_driver_init);
module_exit(my_driver_exit);
MODULE_LICENSE("GPL");
MODULE_AUTHOR("feifei <[email protected]>");
MODULE_DESCRIPTION("A simple device driver");
MODULE_VERSION("1.14");
代码编译运行,结果如下:
关注微信公众号:Linux内核拾遗