* Refactoring: move sink/source buffering into separate classes.

* Buffer the HashSink.  This speeds up hashing a bit because it
  prevents lots of calls to the hash update functions (e.g. nix-hash
  went from 9.3s to 8.7s of user time on the closure of my
  /var/run/current-system).
This commit is contained in:
Eelco Dolstra 2011-12-15 16:19:53 +00:00
parent a67b8ae224
commit 5a1b9ed0aa
8 changed files with 125 additions and 90 deletions

View File

@ -1103,16 +1103,14 @@ struct HashAndWriteSink : Sink
HashAndWriteSink(Sink & writeSink) : writeSink(writeSink), hashSink(htSHA256) HashAndWriteSink(Sink & writeSink) : writeSink(writeSink), hashSink(htSHA256)
{ {
} }
virtual void operator () virtual void operator () (const unsigned char * data, size_t len)
(const unsigned char * data, unsigned int len)
{ {
writeSink(data, len); writeSink(data, len);
hashSink(data, len); hashSink(data, len);
} }
Hash currentHash() Hash currentHash()
{ {
HashSink hashSinkClone(hashSink); return hashSink.currentHash().first;
return hashSinkClone.finish().first;
} }
}; };
@ -1201,8 +1199,7 @@ struct HashAndReadSource : Source
{ {
hashing = true; hashing = true;
} }
virtual void operator () virtual void operator () (unsigned char * data, size_t len)
(unsigned char * data, unsigned int len)
{ {
readSource(data, len); readSource(data, len);
if (hashing) hashSink(data, len); if (hashing) hashSink(data, len);

View File

@ -57,11 +57,11 @@ struct RefScanSink : Sink
RefScanSink() : hashSink(htSHA256) { } RefScanSink() : hashSink(htSHA256) { }
void operator () (const unsigned char * data, unsigned int len); void operator () (const unsigned char * data, size_t len);
}; };
void RefScanSink::operator () (const unsigned char * data, unsigned int len) void RefScanSink::operator () (const unsigned char * data, size_t len)
{ {
hashSink(data, len); hashSink(data, len);

View File

@ -374,7 +374,7 @@ Path RemoteStore::importPath(bool requireSignature, Source & source)
openConnection(); openConnection();
writeInt(wopImportPath, to); writeInt(wopImportPath, to);
/* We ignore requireSignature, since the worker forces it to true /* We ignore requireSignature, since the worker forces it to true
anyway. */ anyway. */
processStderr(0, &source); processStderr(0, &source);
return readStorePath(from); return readStorePath(from);
} }

View File

@ -306,21 +306,13 @@ HashSink::HashSink(HashType ht) : ht(ht)
start(ht, *ctx); start(ht, *ctx);
} }
HashSink::HashSink(const HashSink & h)
{
ht = h.ht;
bytes = h.bytes;
ctx = new Ctx;
*ctx = *h.ctx;
}
HashSink::~HashSink() HashSink::~HashSink()
{ {
bufPos = 0;
delete ctx; delete ctx;
} }
void HashSink::operator () void HashSink::write(const unsigned char * data, size_t len)
(const unsigned char * data, unsigned int len)
{ {
bytes += len; bytes += len;
update(ht, *ctx, data, len); update(ht, *ctx, data, len);
@ -328,11 +320,21 @@ void HashSink::operator ()
HashResult HashSink::finish() HashResult HashSink::finish()
{ {
flush();
Hash hash(ht); Hash hash(ht);
nix::finish(ht, *ctx, hash.hash); nix::finish(ht, *ctx, hash.hash);
return HashResult(hash, bytes); return HashResult(hash, bytes);
} }
HashResult HashSink::currentHash()
{
flush();
Ctx ctx2 = *ctx;
Hash hash(ht);
nix::finish(ht, ctx2, hash.hash);
return HashResult(hash, bytes);
}
HashResult hashPath( HashResult hashPath(
HashType ht, const Path & path, PathFilter & filter) HashType ht, const Path & path, PathFilter & filter)

View File

@ -91,7 +91,7 @@ string printHashType(HashType ht);
union Ctx; union Ctx;
class HashSink : public Sink class HashSink : public BufferedSink
{ {
private: private:
HashType ht; HashType ht;
@ -102,8 +102,9 @@ public:
HashSink(HashType ht); HashSink(HashType ht);
HashSink(const HashSink & h); HashSink(const HashSink & h);
~HashSink(); ~HashSink();
virtual void operator () (const unsigned char * data, unsigned int len); void write(const unsigned char * data, size_t len);
HashResult finish(); HashResult finish();
HashResult currentHash();
}; };

View File

@ -8,7 +8,16 @@
namespace nix { namespace nix {
void FdSink::operator () (const unsigned char * data, unsigned int len) BufferedSink::~BufferedSink()
{
/* We can't call flush() here, because C++ for some insane reason
doesn't allow you to call virtual methods from a destructor. */
assert(!bufPos);
if (buffer) delete[] buffer;
}
void BufferedSink::operator () (const unsigned char * data, size_t len)
{ {
if (!buffer) buffer = new unsigned char[bufSize]; if (!buffer) buffer = new unsigned char[bufSize];
@ -16,7 +25,7 @@ void FdSink::operator () (const unsigned char * data, unsigned int len)
/* Optimisation: bypass the buffer if the data exceeds the /* Optimisation: bypass the buffer if the data exceeds the
buffer size and there is no unflushed data. */ buffer size and there is no unflushed data. */
if (bufPos == 0 && len >= bufSize) { if (bufPos == 0 && len >= bufSize) {
writeFull(fd, data, len); write(data, len);
break; break;
} }
/* Otherwise, copy the bytes to the buffer. Flush the buffer /* Otherwise, copy the bytes to the buffer. Flush the buffer
@ -29,31 +38,32 @@ void FdSink::operator () (const unsigned char * data, unsigned int len)
} }
void FdSink::flush() void BufferedSink::flush()
{ {
if (fd == -1 || bufPos == 0) return; if (bufPos == 0) return;
writeFull(fd, buffer, bufPos); write(buffer, bufPos);
bufPos = 0; bufPos = 0;
} }
void FdSource::operator () (unsigned char * data, unsigned int len) void FdSink::write(const unsigned char * data, size_t len)
{
writeFull(fd, data, len);
}
BufferedSource::~BufferedSource()
{
if (buffer) delete[] buffer;
}
void BufferedSource::operator () (unsigned char * data, size_t len)
{ {
if (!buffer) buffer = new unsigned char[bufSize]; if (!buffer) buffer = new unsigned char[bufSize];
while (len) { while (len) {
if (!bufPosIn) { if (!bufPosIn) bufPosIn = read(buffer, bufSize);
/* Read as much data as is available (up to the buffer
size). */
checkInterrupt();
ssize_t n = read(fd, (char *) buffer, bufSize);
if (n == -1) {
if (errno == EINTR) continue;
throw SysError("reading from file");
}
if (n == 0) throw EndOfFile("unexpected end-of-file");
bufPosIn = n;
}
/* Copy out the data in the buffer. */ /* Copy out the data in the buffer. */
size_t n = len > bufPosIn - bufPosOut ? bufPosIn - bufPosOut : len; size_t n = len > bufPosIn - bufPosOut ? bufPosIn - bufPosOut : len;
@ -64,7 +74,20 @@ void FdSource::operator () (unsigned char * data, unsigned int len)
} }
void writePadding(unsigned int len, Sink & sink) size_t FdSource::read(unsigned char * data, size_t len)
{
ssize_t n;
do {
checkInterrupt();
n = ::read(fd, (char *) data, bufSize);
} while (n == -1 && errno == EINTR);
if (n == -1) throw SysError("reading from file");
if (n == 0) throw EndOfFile("unexpected end-of-file");
return n;
}
void writePadding(size_t len, Sink & sink)
{ {
if (len % 8) { if (len % 8) {
unsigned char zero[8]; unsigned char zero[8];
@ -103,7 +126,7 @@ void writeLongLong(unsigned long long n, Sink & sink)
void writeString(const string & s, Sink & sink) void writeString(const string & s, Sink & sink)
{ {
unsigned int len = s.length(); size_t len = s.length();
writeInt(len, sink); writeInt(len, sink);
sink((const unsigned char *) s.c_str(), len); sink((const unsigned char *) s.c_str(), len);
writePadding(len, sink); writePadding(len, sink);
@ -118,11 +141,11 @@ void writeStringSet(const StringSet & ss, Sink & sink)
} }
void readPadding(unsigned int len, Source & source) void readPadding(size_t len, Source & source)
{ {
if (len % 8) { if (len % 8) {
unsigned char zero[8]; unsigned char zero[8];
unsigned int n = 8 - (len % 8); size_t n = 8 - (len % 8);
source(zero, n); source(zero, n);
for (unsigned int i = 0; i < n; i++) for (unsigned int i = 0; i < n; i++)
if (zero[i]) throw SerialisationError("non-zero padding"); if (zero[i]) throw SerialisationError("non-zero padding");
@ -162,7 +185,7 @@ unsigned long long readLongLong(Source & source)
string readString(Source & source) string readString(Source & source)
{ {
unsigned int len = readInt(source); size_t len = readInt(source);
unsigned char * buf = new unsigned char[len]; unsigned char * buf = new unsigned char[len];
AutoDeleteArray<unsigned char> d(buf); AutoDeleteArray<unsigned char> d(buf);
source(buf, len); source(buf, len);

View File

@ -11,7 +11,25 @@ namespace nix {
struct Sink struct Sink
{ {
virtual ~Sink() { } virtual ~Sink() { }
virtual void operator () (const unsigned char * data, unsigned int len) = 0; virtual void operator () (const unsigned char * data, size_t len) = 0;
};
/* A buffered abstract sink. */
struct BufferedSink : Sink
{
size_t bufSize, bufPos;
unsigned char * buffer;
BufferedSink(size_t bufSize = 32 * 1024)
: bufSize(bufSize), bufPos(0), buffer(0) { }
~BufferedSink();
void operator () (const unsigned char * data, size_t len);
void flush();
virtual void write(const unsigned char * data, size_t len) = 0;
}; };
@ -20,56 +38,52 @@ struct Source
{ {
virtual ~Source() { } virtual ~Source() { }
/* The callee should store exactly *len bytes in the buffer /* Store exactly len bytes in the buffer pointed to by data.
pointed to by data. It should block if that much data is not It blocks if that much data is not yet available, or throws an
yet available, or throw an error if it is not going to be error if it is not going to be available. */
available. */ virtual void operator () (unsigned char * data, size_t len) = 0;
virtual void operator () (unsigned char * data, unsigned int len) = 0;
}; };
/* A sink that writes data to a file descriptor (using a buffer). */ /* A buffered abstract source. */
struct FdSink : Sink struct BufferedSource : Source
{ {
int fd; size_t bufSize, bufPosIn, bufPosOut;
unsigned int bufSize, bufPos;
unsigned char * buffer; unsigned char * buffer;
FdSink() : fd(-1), bufSize(32 * 1024), bufPos(0), buffer(0) { } BufferedSource(size_t bufSize = 32 * 1024)
: bufSize(bufSize), bufPosIn(0), bufPosOut(0), buffer(0) { }
~BufferedSource();
FdSink(int fd, unsigned int bufSize = 32 * 1024) void operator () (unsigned char * data, size_t len);
: fd(fd), bufSize(bufSize), bufPos(0), buffer(0) { }
~FdSink()
{
flush();
if (buffer) delete[] buffer;
}
void operator () (const unsigned char * data, unsigned int len); /* Store up to len in the buffer pointed to by data, and
return the number of bytes stored. If should block until at
least one byte is available. */
virtual size_t read(unsigned char * data, size_t len) = 0;
};
void flush();
/* A sink that writes data to a file descriptor. */
struct FdSink : BufferedSink
{
int fd;
FdSink() : fd(-1) { }
FdSink(int fd) : fd(fd) { }
~FdSink() { flush(); }
void write(const unsigned char * data, size_t len);
}; };
/* A source that reads data from a file descriptor. */ /* A source that reads data from a file descriptor. */
struct FdSource : Source struct FdSource : BufferedSource
{ {
int fd; int fd;
unsigned int bufSize, bufPosIn, bufPosOut; FdSource() : fd(-1) { }
unsigned char * buffer; FdSource(int fd) : fd(fd) { }
size_t read(unsigned char * data, size_t len);
FdSource() : fd(-1), bufSize(32 * 1024), bufPosIn(0), bufPosOut(0), buffer(0) { }
FdSource(int fd, unsigned int bufSize = 32 * 1024)
: fd(fd), bufSize(bufSize), bufPosIn(0), bufPosOut(0), buffer(0) { }
~FdSource()
{
if (buffer) delete[] buffer;
}
void operator () (unsigned char * data, unsigned int len);
}; };
@ -77,7 +91,7 @@ struct FdSource : Source
struct StringSink : Sink struct StringSink : Sink
{ {
string s; string s;
virtual void operator () (const unsigned char * data, unsigned int len) void operator () (const unsigned char * data, size_t len)
{ {
s.append((const char *) data, len); s.append((const char *) data, len);
} }
@ -88,9 +102,9 @@ struct StringSink : Sink
struct StringSource : Source struct StringSource : Source
{ {
const string & s; const string & s;
unsigned int pos; size_t pos;
StringSource(const string & _s) : s(_s), pos(0) { } StringSource(const string & _s) : s(_s), pos(0) { }
virtual void operator () (unsigned char * data, unsigned int len) virtual void operator () (unsigned char * data, size_t len)
{ {
s.copy((char *) data, len, pos); s.copy((char *) data, len, pos);
pos += len; pos += len;
@ -100,13 +114,13 @@ struct StringSource : Source
}; };
void writePadding(unsigned int len, Sink & sink); void writePadding(size_t len, Sink & sink);
void writeInt(unsigned int n, Sink & sink); void writeInt(unsigned int n, Sink & sink);
void writeLongLong(unsigned long long n, Sink & sink); void writeLongLong(unsigned long long n, Sink & sink);
void writeString(const string & s, Sink & sink); void writeString(const string & s, Sink & sink);
void writeStringSet(const StringSet & ss, Sink & sink); void writeStringSet(const StringSet & ss, Sink & sink);
void readPadding(unsigned int len, Source & source); void readPadding(size_t len, Source & source);
unsigned int readInt(Source & source); unsigned int readInt(Source & source);
unsigned long long readLongLong(Source & source); unsigned long long readLongLong(Source & source);
string readString(Source & source); string readString(Source & source);

View File

@ -202,8 +202,7 @@ struct TunnelSink : Sink
{ {
Sink & to; Sink & to;
TunnelSink(Sink & to) : to(to) { } TunnelSink(Sink & to) : to(to) { }
virtual void operator () virtual void operator () (const unsigned char * data, size_t len)
(const unsigned char * data, unsigned int len)
{ {
writeInt(STDERR_WRITE, to); writeInt(STDERR_WRITE, to);
writeString(string((const char *) data, len), to); writeString(string((const char *) data, len), to);
@ -215,8 +214,7 @@ struct TunnelSource : Source
{ {
Source & from; Source & from;
TunnelSource(Source & from) : from(from) { } TunnelSource(Source & from) : from(from) { }
virtual void operator () virtual void operator () (unsigned char * data, size_t len)
(unsigned char * data, unsigned int len)
{ {
/* Careful: we're going to receive data from the client now, /* Careful: we're going to receive data from the client now,
so we have to disable the SIGPOLL handler. */ so we have to disable the SIGPOLL handler. */
@ -267,7 +265,7 @@ struct SavingSourceAdapter : Source
Source & orig; Source & orig;
string s; string s;
SavingSourceAdapter(Source & orig) : orig(orig) { } SavingSourceAdapter(Source & orig) : orig(orig) { }
void operator () (unsigned char * data, unsigned int len) void operator () (unsigned char * data, size_t len)
{ {
orig(data, len); orig(data, len);
s.append((const char *) data, len); s.append((const char *) data, len);