我们可以多快做出特定的 tr?

How fast can we make a specific tr?

我不得不用另一个字符替换文件中的所有空字节(我任意选择 @),并且非常惊讶 tr '[=15=]' '@' 的速度大约是 [= 的 1/4 16=]:

$ pv < lawl | gzip > /dev/null
^C13MiB 0:00:04 [28.5MiB/s] [====>                             ] 17% ETA 0:00:18
$ pv < lawl | tr '[=10=]' '@' > /dev/null
^C58MiB 0:00:08 [7.28MiB/s] [==>                               ]  9% ETA 0:01:20

我的真实数据文件是 3GB 的 gzip 压缩文件,花了 50 分钟 tr,我实际上需要对很多这样的文件执行此操作,所以这不完全是一个学术问题。请注意,从磁盘(这里是相当快的 SSD)或 pv 读取在任何一种情况下都不是瓶颈; gziptr 都使用 100% CPU,cat 快得多:

$ pv < lawl | cat > /dev/null
 642MiB 0:00:00 [1.01GiB/s] [================================>] 100%

此代码:

#include <stdio.h>

int main() {
    int ch;
    while ((ch = getchar()) != EOF) {
        if (ch == '[=12=]') {
            putchar('@');
        } else {
            putchar(ch);
        }
    }
}

clang -O3 编译稍微快一些:

$ pv < lawl | ./stupidtr > /dev/null
^C52MiB 0:00:06 [ 8.5MiB/s] [=>                                ]  8% ETA 0:01:0

gcc -O4 -mtune=native -march=native (4.8.4) 编译是可比的,可能稍微快一点。将 -march=native 添加到 clang (Apple LLVM version 6.1.0 (clang-602.0.53) (based on LLVM 3.6.0svn)) 会生成相同的二进制文件。

这大概只是因为tr中替换的通用处理代码被替换为常量,检查可以编译下来。 LLVM IR (clang -S -O3 stupidtr.c) 看起来很不错。

我想 gzip 一定更快,因为它正在执行 SIMD 指令或其他操作。是否有可能达到 gzip 速度?

一些规范,如果它们相关的话:

您的代码不正确,因为您没有在正确的位置测试文件结尾。这是 do {} while 循环中 非常 的常见错误。我建议完全避免这种构造(除了在宏中将语句序列转换为单个语句)。

同时尝试告诉 glibc 对流执行更少的检查:

#include <stdio.h>

int main() {
    int c;
    while ((c = getchar_unlocked()) != EOF) {
        if (c == '[=10=]')
            c = '@':
        putchar_unlocked(c);
    }
}

您还可以使用不同的缓冲区大小,例如在 while() 循环之前尝试这些:

setvbuf(stdin, NULL, _IOFBF, 1024 * 1024);
setvbuf(stdout, NULL, _IOFBF, 1024 * 1024);

如果您将该实用程序用作带管道的过滤器,应该不会有太大影响,但如果您使用文件,它可能会更有效率。

如果您使用文件,您也可以 mmap 文件并使用 memchr 来查找 '[=16=]' 字节,甚至 strchr 可能会更快而且您可以确保文件末尾有一个`'\0``(放在那里是个好方法)。

您需要使用块读取和写入来提高速度。 (即使使用像 stdio.h 这样的缓冲 I/O 库,管理缓冲区的成本也可能很高。)像:

#include <unistd.h>
int main( void )
{
    char buffer[16384];
    int size, i;
    while ((size = read(0, buffer, sizeof buffer)) > 0) {
        for( i = 0; i < size; ++i ) {
            if (buffer[i] == '[=10=]') {
                buffer[i] = '@';
                // optionally, i += 64; since
                // "Once you've found a null byte, it's also guaranteed that there can't
                // be another one for a hundred bytes or so"
            }
        }
        write(1, buffer, size);
    }
}

自然地,进行优化编译,以便编译器可以在需要时将索引转换为指针算法。

如果您仍然没有达到您的速度目标(或者足够智能的编译器可能会自动矢量化 for 循环),此版本也非常适合 SIMD 优化。

此外,此代码缺乏可靠的错误处理。正如@chqrlie 在评论中提到的,您应该在获得 -EINTR 时重试,并且您应该处理部分写入。

首先,正如其他人指出的那样,不要使用 getchar()/putchar(),甚至不要使用任何 FILE-based 方法,例如 fopen()/fread()/fwrite()。请改用 open()/read()/write()

如果文件已经在磁盘上解压缩,请不要使用管道。如果它被压缩,那么你确实想要使用管道来删除整个 read/write 循环。如果从磁盘解压回磁盘,然后替换NUL字符,数据路径为disk->memory/cpu->disk->memory/cpu->disk。如果使用管道,则路径为disk->memory/cpu->disk。如果您是 disk-limited,那么额外的 read/write 周期将使处理千兆字节(或更多)数据所需的时间增加一倍。

还有一件事 - 考虑到您的 IO 模式和移动的数据量 - 读取整个 multi-GB 文件,写入整个文件 - 页面缓存只会妨碍您。使用直接 IO,因此在 Linux 上的 C 语言中(headers 并且为清楚起见而放弃了强大的错误检查):

#define CHUNK_SIZE ( 1024UL * 1024UL * 4UL )
#define NEW_CHAR '@'
int main( int argc, char **argv )
{
    /* page-aligned buffer */
    char *buf = valloc( CHUNK_SIZE );
    /* just set "in = 0" to read a stdin pipe */
    int in = open( argv[ 1 ], O_RDONLY | O_DIRECT );
    int out = open( argv[ 2 ], O_WRONLY | O_CREAT | O_TRUNC | O_DIRECT, 0644 );

    for ( ;; )
    {
        ssize_t bytes = read( in, buf, CHUNK_SIZE );
        if ( bytes < 0 )
        {
            if ( errno == EINTR )
            {
                continue;
            }
            break;
        }
        else if ( bytes == 0 )
        {
            break;
        }

        for ( int ii = 0; ii < bytes; ii++ )
        {
            if ( !buf[ ii ] )
            {
                buf[ ii ] = NEW_CHAR;
            }
        }
        write( out, buf, bytes );
    }

    close( in );
    close( out );

    return( 0 );
}

尽可能提高编译器优化。要在真实数据上使用此代码,您需要检查 write() 调用的结果 - Linux 上的直接 IO 是一个真正挑剔的野兽。我不得不关闭一个用 O_DIRECT 打开的文件并在没有直接 IO 设置的情况下重新打开它,以便在最后一位不是 a 的倍数时将文件的最后一个字节写入 Linux整页。

如果你想走得更快,你可以多线程处理 - 一个线程读取,一个线程转换字符,另一个线程写入。使用尽可能多的缓冲区,根据需要将它们从一个线程传递到另一个线程,以使进程中最慢的部分始终保持忙碌。

如果您真的想知道移动数据的速度有多快,也可以多线程 reads/writes。如果您的文件系统支持它,请使用异步 read/write.

将各种答案的想法与一些额外的技巧相结合,这是一个优化版本:

#include <errno.h>
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

#define BUFFER_SIZE  16384
#define REPLACE_CHAR  '@'

int main(void) {
    /* define buffer as uint64_t to force alignment */
    /* make it one slot longer to allow for loop guard */
    uint64_t buffer[BUFFER_SIZE/8 + 1];
    ssize_t size, chunk;
    uint64_t *p, *p_end;
    uint64_t rep8 = (uint8_t)REPLACE_CHAR * 0x0101010101010101ULL;

    while ((size = read(0, buffer, BUFFER_SIZE)) != 0) {
        if (size < 0) {
            if (errno == EINTR) continue;
            fprintf(stderr, "read error: %s\n", strerror(errno));
            return 1;
        }
        p = buffer;
        p_end = p + ((size + 7) >> 3);
        *p_end = 0ULL; /* force a 0 at the end */
        for (;; p++) {
#define LOWBITS   0x0101010101010101ULL
#define HIGHBITS  0x8080808080808080ULL
            uint64_t m = ((*p - LOWBITS) & ~*p & HIGHBITS);
            if (m != 0) {
                if (p >= p_end) break;
                m |= m >> 1;
                m |= m >> 2;
                m |= m >> 4;
                *p |= m & rep8;
            }
        }
        for (unsigned char *pc = (unsigned char *)buffer;
             (chunk = write(1, pc, (size_t)size)) != size;
             pc += chunk, size -= chunk) {
            if (chunk < 0) {
                if (errno == EINTR) continue;
                fprintf(stderr, "write error: %s\n", strerror(errno));
                return 2;
            }
        }
    }
    return 0;
}