Consider the following server implementation:
static const string kAlphabet = "abcdefghijklmnopqrstuvwxyz";
static const useconds_t kDelay = 100000; // 100000 microseconds is 100 ms is 0.1 seconds
static void handleRequest(int client) {
sockbuf sb(client);
iosockstream ss(&sb);
for (size_t i = 0; i < kAlphabet.size(); i++) {
ss << kAlphabet[i] << flush;
usleep(kDelay);
}
}
static const unsigned short kSlowAlphabetServerPort = 41411;
int main(int argc, char *argv[]) {
int server = createServerSocket(kSlowAlphabetServerPort);
ThreadPool pool(128);
while (true) {
int client = accept(server, NULL, NULL);
pool.schedule([client]() { handleRequest(client); });
}
return 0;
}Presented here is a traditional (i.e. blocking) client of the slow-alphabet-server:
static const unsigned short kSlowAlphabetServerPort = 41411;
int main(int argc, char *argv[]) {
int client = createClientSocket("localhost", kSlowAlphabetServerPort);
size_t numSuccessfulReads = 0;
size_t numBytes = 0;
while (true) {
char ch;
ssize_t count = read(client, &ch, 1);
assert(count != -1); // simple sanity check, assume more robust in practice
if (count == 0) break; // we are truly done
numSuccessfulReads++;
numBytes += count;
cout << ch << flush;
}
close(client);
cout << endl;
cout << "Alphabet Length: " << numBytes << " bytes." << endl;
cout << "Num reads: " << numSuccessfulReads << endl;
return 0;
}myth7> ./slow-alphabet-server &
[1] 7516
myth7> ./blocking-alphabet-client
abcdefghijklmnopqrstuvwxyz
Alphabet Length: 26 bytes.
Num reads: 26
myth7> time ./blocking-alphabet-client
abcdefghijklmnopqrstuvwxyz
Alphabet Length: 26 bytes.
Num reads: 26
0.000u 0.002s 0:02.60 0.0% 0+0k 0+8io 0pf+0w
myth7> kill -KILL 7516
[1] Killed ./slow-alphabet-serverPresented here is client of the slow-alphabet-server that relies on nonblocking I/O:
static const unsigned short kSlowAlphabetServerPort = 41411;
int main(int argc, char *argv[]) {
int client = createClientSocket("localhost", kSlowAlphabetServerPort);
setAsNonBlocking(client);
size_t numReads = 0;
size_t numSuccessfulReads = 0;
size_t numUnsuccessfulReads = 0;
size_t numBytes = 0;
while (true) {
char ch;
ssize_t count = read(client, &ch, 1);
numReads++;
if (count == 0) break; // we are truly done
if (count > 0) {
numSuccessfulReads++;
numBytes += count;
cout << ch << flush;
} else {
assert(errno == EWOULDBLOCK || errno == EAGAIN);
numUnsuccessfulReads++;
}
}
close(client);
cout << endl;
cout << "Alphabet Length: " << numBytes << " bytes." << endl;
cout << "Num reads: " << numReads << " (" << numSuccessfulReads << " successful, " << numUnsuccessfulReads << " unsuccessful)." << endl;
return 0;
}Look at the output of non-blocking-alphabet-client:
myth7> ./slow-alphabet-server &
[1] 9801
myth7> ./non-blocking-alphabet-client
abcdefghijklmnopqrstuvwxyz
Alphabet Length: 26 bytes.
Num reads: 11394590 (26 successful, 11394563 unsuccessful).
myth7> time ./non-blocking-alphabet-client
abcdefghijklmnopqrstuvwxyz
Alphabet Length: 26 bytes.
Num reads: 11268991 (26 successful, 11268964 unsuccessful).
0.399u 2.202s 0:02.60 99.6% 0+0k 0+0io 0pf+0w
myth7> kill -KILL 9801
myth7>
[1] Killed ./slow-alphabet-server
myth7>class OutboundFile {
public:
OutboundFile();
void initialize(const std::string& source, int sink);
bool sendMoreData();
private:
// implementation details ommitted for the moment
}
/**
* File: outbound-file-test.cc
* ---------------------------
* Demonstrates how one should use the OutboundFile class
* and can be used to confirm that it works properly.
*/
#include "outbound-file.h"
int main(int argc, char *argv[]) {
OutboundFile obf;
obf.initialize("outbound-file-test.cc", STDOUT_FILENO);
while (obf.sendMoreData()) {;}
return 0;
}static const unsigned short kDefaultPort = 12345;
static const string kFileToServe("expensive-server.cc");
int main(int argc, char *argv[]) {
int serverSocket = createServerSocket(kDefaultPort);
if (serverSocket == kServerSocketFailure) {
cerr << "Could not start server. Port " << kDefaultPort << " is probably in use." << endl;
return 0;
}
setAsNonBlocking(serverSocket);
cout << "Static file server listening on port " << kDefaultPort << "." << endl;
list<OutboundFile> outboundFiles;
size_t numConnections = 0;
size_t numActiveConnections = 0;
while (true) {
int clientSocket = accept(serverSocket, NULL, NULL);
if (clientSocket == -1) {
assert(errno == EWOULDBLOCK);
} else {
OutboundFile obf;
obf.initialize(kFileToServe, clientSocket);
outboundFiles.push_back(obf);
cout << "Connection #" << ++numConnections << endl;
cout << "Queue size: " << ++numActiveConnections << endl;
}
auto iter = outboundFiles.begin();
while (iter != outboundFiles.end()) {
if (iter->sendMoreData()) {
++iter;
} else {
iter = outboundFiles.erase(iter);
cout << "Queue size: " << --numActiveConnections << endl;
}
}
}
}
void setAsNonBlocking(int descriptor) {
int flags = fcntl(descriptor, F_GETFL);
if (flags == -1) flags = 0; // if first call to fcntl fails, just go with 0
fcntl(descriptor, F_SETFL, flags | O_NONBLOCK); // preserve other set flags
}class OutboundFile {
public:
OutboundFile();
void initialize(const std::string& source, int sink);
bool sendMoreData();
private:
int source, sink;
static const size_t kBufferSize = 128;
char buffer[kBufferSize];
size_t numBytesAvailable;
size_t numBytesSent;
bool isSending;
bool dataReadyToBeSent() const;
void readMoreData();
void writeMoreData();
bool allDataFlushed();
};
OutboundFile::OutboundFile() : isSending(false) {}
void OutboundFile::initialize(const string& source, int sink) {
this->source = open(source.c_str(), O_RDONLY | O_NONBLOCK);
this->sink = sink;
setAsNonBlocking(this->sink);
numBytesAvailable = numBytesSent = 0;
isSending = true;
}
bool OutboundFile::sendMoreData() {
if (!isSending) return !allDataFlushed();
if (!dataReadyToBeSent()) {
readMoreData();
if (!dataReadyToBeSent()) return true;
}
writeMoreData();
return true;
}