这篇文章是 《读厚<编程珠玑>》系列博客 的第一篇,我们在《编程珠玑》的第一章 - 开篇中就了解了位向量是什么,《编程珠玑》的作者使用位向量来解决了一个海量数据排序问题,这篇文章我们来深入的了解一下位向量的实现与应用。
位向量,也叫位图,是一个我们经常可以用到的数据结构,在使用小空间来处理大量数据方面有着得天独厚的优势。位向量,顾名思义就是「位构成的向量」,我们通常使用0来表示 false,1来表示 true,例:[010111] 我们就可以说它是一个位向量,它表示 [false true false true true true]。在位向量这个数据结构中, 我们常常把它的索引和它的值对应起来使用 。
通常我们实现位向量的思路是:使用 基本数据类型 来表示多个位,使用 多个基本数据类型 来构成数组。例如:我们使用「int」类型来实现位向量,一个「int」类型有32位,我们使用 int 数组来存放整个位向量。
下面根据这个思路我们来写一下代码:
#ifndef bitvector_h
#define bitvector_h
#include <stdio.h>
#define N 1000000 //表示位向量元素个数
#define BITPERINT 32 //int 有32位
#define NUM (N-1)/BITPERINT+1
#define SHIFT 5 //2^5=32,表示移位
#define MASK 0x1F //二进制11111
#define SET(i){vector[i >> SHIFT] |= (1 << (i & MASK));} //第i位 置为1
#define CLR(i){vector[i >> SHIFT] &= ~(1 << (i & MASK));} //第i位 置为0
#endif /* bitvector_h */
下面我们来分析一下这段代码,关键的部分就是 SET 和 CLR 这两个宏函数,我们来分解看一下代码:
i >> SHIFT
表示算数右移5位,即 i / 32
,该操作的作用是将数组的索引定位到需要操作的那个 int 的位置上,因为我们的位向量结构是由许多个 int 组成的,例如:如果 i = 50
, i >> SHIFT
等于1,首先将第50位定位到了第2个 int 中。
i & MASK
表示取 i 的最后5位,例: 50 & MASK
等于 10010
,然后把1左移这么多位,即第18位为1
SET
操作是取或运算,即把定位到的 int 中的某位设置为1,例: i = 50
即把第二个 int 的第18位置1。 CLR
操作也是同样的道理。
使用位向量:
int vector[NUM];
int i;
for(i = 0; i < N; i++)
{
CLR(i);
}
SET(1);
SET(20);
问题重述:一个最多包含n个正整数的文件,每个数都小于n,其中n=107,并且没有重复。最多有1MB内存可用。要求用最快方式将它们排序并按升序输出。
解决方案就是:把文件一次读入,出现的数字在位向量对应索引处中标注为1,读取完文件之后,将位向量从低位向高位依次将为1的索引输出即可。
我们知道:在 Linux 中,进程当中的 pid 号的分配是在 0~32767 之间的,其中 0~299 的进程号是分配给守护进程的,剩下的 pid 号是分配给普通进程的。在进程号(pid)的分配中就使用到了位向量。
(以下Linux 内核代码版本为:3.8)
Linux 中用来存放 pid 的位向量结构体叫做 pidmap
,具体代码如下(/include/linux/pid_namespace.h):
struct pidmap {
atomic_t nr_free; //表示未分配的 pid 的个数
void *page; //用数组来表示位向量,每一位表示该同该索引号的 pid 是否被分配(1表示被分配,0表示未分配)
};
下面我们来分析一下有关 pidmap
的操作:
(/include/asm-generic/bitops/atomic.h)
static inline int test_and_set_bit(int nr, volatile unsigned long *addr)
{
unsigned long mask = BIT_MASK(nr);
unsigned long *p = ((unsigned long *)addr) + BIT_WORD(nr);
unsigned long old;
unsigned long flags;
_atomic_spin_lock_irqsave(p, flags);
old = *p;
*p = old | mask;
_atomic_spin_unlock_irqrestore(p, flags);
return (old & mask) != 0;
}
这个函数的作用是:当为一个进程申请到 pid 之后,将对应的pidmap中的 nr
位置位为1的函数,并返回置位之前该位的值。 /*addr
表示的是 pidmap 的地址
下面我们来分析一下具体代码:
在 /include/linux/bitops.h
中我们可以找到 BIT_MASK
的宏定义:
#define BIT_MASK(nr) (1UL << ((nr) % BITS_PER_LONG))
我们同样可以在 include/asm-generic/bitsperlong.h
找到 BITS_PER_LONG
的定义:
#ifdef CONFIG_64BIT
#define BITS_PER_LONG 64
#else
#define BITS_PER_LONG 32
#endif /* CONFIG_64BIT */
我们可以知道 BITS_PER_LONG
是和机器相关的,为32或64.
1UL << (nr) % BITS_PER_LONG)
表示取 nr
的第0~BITS_PER_LONG位,然后把1左移这么多位
我们同样可以在 include/asm-generic/bitsperlong.h
找到 BIT_WORD
的定义:
#define BIT_WORD(nr) ((nr) / BITS_PER_LONG)
它用于定位想要操作的 nr
位在数组中的第几个单位中,因此 unsigned long *p = ((unsigned long *)addr) + BIT_WORD(nr);
这条语句就是将 *p
指向想要操作的位
*p = old | mask;
将第 nr
位置 1
return (old & mask) != 0;
返回置位之前 nr
位的值
(/include/asm-generic/bitops/atomic.h)
static inline int test_and_clear_bit(int nr, volatile unsigned long *addr)
{
unsigned long mask = BIT_MASK(nr);
unsigned long *p = ((unsigned long *)addr) + BIT_WORD(nr);
unsigned long old;
unsigned long flags;
_atomic_spin_lock_irqsave(p, flags);
old = *p;
*p = old & ~mask;
_atomic_spin_unlock_irqrestore(p, flags);
return (old & mask) != 0;
}
与上面不同的操作就是 *p = old & ~mask;
将第 nr
位置 0
unsigned long find_next_zero_bit(const unsigned long *addr, unsigned long size, unsigned long offset)
{//addr 表示 pidmap 的地址,size = PAGE_SIZE*8,offset 一开始为0
unsigned long *p = addr + BITOP_WORD(offset);
unsigned long result = offset & ~(BITS_PER_LONG-1);//如果BITS_PER_LONG为32,则取 offset 后5位
unsigned long tmp;
if (offset >= size)
return size;
size -= result;
offset %= BITS_PER_LONG;//offset位于32位的第几位
if (offset) {//如果 offset 不在数组单位中的第一位
tmp = *(p++);
tmp |= ~0UL >> (BITS_PER_LONG - offset);//将0~offset 位置1
if (size < BITS_PER_LONG)//不足32位
goto found_first;
if (~tmp)//存在0
goto found_middle;
//说明 tmp 为全1:
size -= BITS_PER_LONG;//到下一个单位空间
result += BITS_PER_LONG;//数据减少一个单位大小
}
while (size & ~(BITS_PER_LONG-1)) {
if (~(tmp = *(p++)))
goto found_middle;
result += BITS_PER_LONG;//到下一个单位空间
size -= BITS_PER_LONG;//数据减少一个单位大小
}
if (!size)//size = 0 说明已经全部查完,result = size 返回
return result;
tmp = *p;//size 不是32的整数倍,说明有额外几个 bit,继续查
found_first:
tmp |= ~0UL << size;
if (tmp == ~0UL)
return result + size;
found_middle:
return result + ffz(tmp);
}
static int alloc_pidmap(struct pid_namespace *pid_ns)
{
int i, offset, max_scan, pid, last = pid_ns->last_pid;
struct pidmap *map;
pid = last + 1;
if (pid >= pid_max)
pid = RESERVED_PIDS;//RESERVED_PIDS=300,前300为守护进程
offset = pid & BITS_PER_PAGE_MASK;//pid 在一个单位中的偏移量
map = &pid_ns->pidmap[pid/BITS_PER_PAGE];
max_scan = DIV_ROUND_UP(pid_max, BITS_PER_PAGE) - !offset;
for (i = 0; i <= max_scan; ++i) {
if (unlikely(!map->page)) {
//分配空间
void *page = kzalloc(PAGE_SIZE, GFP_KERNEL);
spin_lock_irq(&pidmap_lock);
if (!map->page) {
map->page = page;
page = NULL;
}
spin_unlock_irq(&pidmap_lock);
kfree(page);
if (unlikely(!map->page))
break;
}
if (likely(atomic_read(&map->nr_free))) {
do {
if (!test_and_set_bit(offset, map->page)) {//offset 位置的 pid 是否被分配
atomic_dec(&map->nr_free);//原子操作,将空闲数量减一
set_last_pid(pid_ns, last, pid);
return pid;//分配该 pid
}
offset = find_next_offset(map, offset);
pid = mk_pid(pid_ns, map, offset);
} while (offset < BITS_PER_PAGE && pid < pid_max);
}
if (map < &pid_ns->pidmap[(pid_max-1)/BITS_PER_PAGE]) {
++map;
offset = 0;
} else {
map = &pid_ns->pidmap[0];
offset = RESERVED_PIDS;
if (unlikely(last == offset))
break;
}
pid = mk_pid(pid_ns, map, offset);
}
return -1;
}
本文的版权归作者罗远航 所有,采用 Attribution-NonCommercial 3.0 License 。任何人可以进行转载、分享,但不可在未经允许的情况下用于商业用途;转载请注明出处。感谢配合!