[pan2] fix gzip giganews feature
- From: Heinrich MÃller <henmull src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [pan2] fix gzip giganews feature
- Date: Fri, 11 Jan 2013 20:12:41 +0000 (UTC)
commit a66286bff1f1c20f5da5c08b0d91db591fe21b1b
Author: Heinrich MÃller <henmull src gnome org>
Date: Wed Jan 9 19:31:57 2013 +0100
fix gzip giganews feature
pan/general/compression.cc | 80 +++++++++++++++++++++-----------------
pan/general/locking.h | 6 +-
pan/tasks/nntp-pool.cc | 4 +-
pan/tasks/nntp.cc | 35 ++++++----------
pan/tasks/nntp.h | 14 +++---
pan/tasks/socket-impl-gio.cc | 6 ++-
pan/tasks/socket-impl-openssl.cc | 4 +-
pan/tasks/task-groups.cc | 7 ++-
pan/tasks/task-xover.cc | 52 ++++++++----------------
9 files changed, 97 insertions(+), 111 deletions(-)
---
diff --git a/pan/general/compression.cc b/pan/general/compression.cc
index 821c7d3..90a2287 100644
--- a/pan/general/compression.cc
+++ b/pan/general/compression.cc
@@ -185,7 +185,7 @@ compression::ydecode(std::stringstream* in, std::stringstream* out)
char buf1[512], buf2[512], c, *p, *p2 = buf2;
//unsigned int crc1 = 0, crc = _crc32(NULL, 0, 0);
- while (!in->getline(buf1, sizeof(buf1)).eof())
+ while (!in->getline(buf1, sizeof(buf1)).bad())
{
if (gotbeg == 0 && strncmp(buf1, "=ybegin ", 8) == 0)
{
@@ -247,7 +247,7 @@ int
compression::inflate_zlib(std::stringstream *source, std::stringstream *dest,
const CompressionType& compression)
{
- int ret;
+ int ret = Z_DATA_ERROR;
size_t have;
z_stream strm;
char in[MEMCHUNK];
@@ -264,7 +264,7 @@ compression::inflate_zlib(std::stringstream *source, std::stringstream *dest,
ret = inflateInit2(&strm, -MAX_WBITS); /* use -MAX_WBITS to indicate gzip style */
if (compression == HEADER_COMPRESS_XFEATURE
- || compression == HEADER_COMPRESS_DIABLO)
+ || compression == HEADER_COMPRESS_DIABLO)
ret = inflateInit(&strm);
if (ret != Z_OK)
@@ -273,36 +273,37 @@ compression::inflate_zlib(std::stringstream *source, std::stringstream *dest,
/* decompress until deflate stream ends or end of file */
do
{
- strm.avail_in = source->read(in, MEMCHUNK).gcount();
- if (source->bad())
- {
- (void) inflateEnd(&strm);
- return Z_ERRNO;
- }
+ strm.avail_in = source->readsome(in, MEMCHUNK);
+ if (strm.avail_in < 0) strm.avail_in = 0;
+ if (source->fail())
+ {
+ (void) inflateEnd(&strm);
+ return Z_ERRNO;
+ }
if (strm.avail_in == 0)
break;
strm.next_in = (unsigned char*) in;
/* run inflate() on input until output buffer not full */
do
+ {
+ strm.avail_out = MEMCHUNK;
+ strm.next_out = (unsigned char*) out;
+ ret = inflate(&strm, Z_NO_FLUSH);
+ assert(ret != Z_STREAM_ERROR);
+ /* state not clobbered */
+ switch (ret)
{
- strm.avail_out = MEMCHUNK;
- strm.next_out = (unsigned char*) out;
- ret = inflate(&strm, Z_NO_FLUSH);
- assert(ret != Z_STREAM_ERROR);
- /* state not clobbered */
- switch (ret)
- {
case Z_NEED_DICT:
ret = Z_DATA_ERROR; /* and fall through */
case Z_DATA_ERROR:
case Z_MEM_ERROR:
(void) inflateEnd(&strm);
return ret;
- }
- have = MEMCHUNK - strm.avail_out;
- dest->write(out, have);
}
+ have = MEMCHUNK - strm.avail_out;
+ dest->write(out, have);
+ }
while (strm.avail_out == 0);
/* done when inflate() says it's done */
@@ -321,28 +322,35 @@ void compression::inflate_gzip (std::stringstream* stream, std::vector<std::stri
char* buf;
char buf2[4096];
- std::stringstream dest, dest2;
- //line_init(&g_line, stream);
+ line_init(&g_line, stream);
- /*
+ std::stringstream dest, dest2;
while (!g_line.eof) {
- while ((len = line_read(&g_line, &buf)) > 0) {
- buf[len-1] = '\n';
- dest.write(buf, len);
- if (len >= 3 && strncmp(buf + len - 3, ".\r\n", 3) == 0) {
- g_line.eof = 1;
- break;
- }
- }
- }*/
+ while ((len = line_read(&g_line, &buf)) > 0) {
+ buf[len-1] = '\n';
+ dest.write(buf, len);
+ if (len >= 3 && strncmp(buf + len - 1, ".", 1) == 0) {
+ g_line.eof = 1;
+ break;
+ }
+ }
+ }
- std::ofstream out ("/home/imhotep/compression/out");
- out << stream->str();
- out.close();
+ std::cerr<<"inflate : "<<inflate_zlib(&dest, &dest2, HEADER_COMPRESS_XFEATURE)<<"\n\n";
- std::cerr<<"inflate : "<<inflate_zlib(stream, &dest2, HEADER_COMPRESS_XFEATURE)<<"\n\n";
+ std::ofstream out ("/home/imhotep/compression/out");
+ out << dest2.str();
+ out.close();
+ int cnt=0;
while (!dest2.getline(buf2,4096).eof())
- fillme.push_back(std::string(buf2));
+ {if (buf2) fillme.push_back(std::string(buf2));
+ ++cnt;}
+
+ stream->clear();
+
+ std::cerr<<cnt<<"\n";
+
+
}
diff --git a/pan/general/locking.h b/pan/general/locking.h
index c41f9df..2ee0e8c 100644
--- a/pan/general/locking.h
+++ b/pan/general/locking.h
@@ -36,7 +36,7 @@ namespace pan
class Mutex
{
private:
- GMutex mutex;
+ static GMutex mutex;
GMutex * m;
public:
@@ -44,7 +44,7 @@ namespace pan
/** Create a new mutex */
Mutex()
{
-#if !GLIB_CHECK_VERSION(2,34,1)
+#if !GLIB_CHECK_VERSION(2,34,2)
m = g_mutex_new();
#else
g_mutex_init(&mutex);
@@ -55,7 +55,7 @@ namespace pan
/** Destroy the mutex */
virtual ~Mutex()
{
-#if !GLIB_CHECK_VERSION(2,34,1)
+#if !GLIB_CHECK_VERSION(2,34,2)
g_mutex_free(m);
#endif
}
diff --git a/pan/tasks/nntp-pool.cc b/pan/tasks/nntp-pool.cc
index a59bfd1..e8b60d2 100644
--- a/pan/tasks/nntp-pool.cc
+++ b/pan/tasks/nntp-pool.cc
@@ -193,11 +193,11 @@ NNTP_Pool :: on_socket_created (const StringView & host,
{
std::string pw (pass ? pass : "");
if (pass) g_free(pass);
- nntp = new NNTP (_server, user, pw, _meter, socket);
+ nntp = new NNTP (_server, user, pw, _meter, _server_info, socket);
}
else
{
- nntp = new NNTP ( _server, user, pass, _meter, socket);
+ nntp = new NNTP ( _server, user, pass, _meter, _server_info, socket);
}
nntp->handshake (this);
}
diff --git a/pan/tasks/nntp.cc b/pan/tasks/nntp.cc
index 604cfbe..fce8106 100644
--- a/pan/tasks/nntp.cc
+++ b/pan/tasks/nntp.cc
@@ -98,20 +98,20 @@ NNTP :: on_socket_response (Socket * sock UNUSED, const StringView& line_in)
assert (_listener != 0);
if (_listener)
- _listener->on_nntp_line (this, line);
+ _listener->on_nntp_line (this, line_in);
}
if (_compression)
{
- state = CMD_DONE;
+ state = CMD_MORE;
assert (_listener != 0);
if (_listener)
_listener->on_nntp_line (this, line_in);
- if (line_in.len >= 3 && line.str[line.len-1] == '.')
+ if (line_in.len >= 3 && strncmp(line_in.str + line_in.len - 3, ".\r\n", 3) == 0)
{
_compression = false;
_nntp_response_text = false;
- line = "COMPRESS_DONE";
+ line = EOL;
state = CMD_DONE;
}
}
@@ -163,9 +163,15 @@ NNTP :: on_socket_response (Socket * sock UNUSED, const StringView& line_in)
}
case AUTH_ACCEPTED:
- // try to enable compression xfeature
- _socket->write_command (ENABLE_COMPRESS_GZIP, this);
- state = CMD_NEXT;
+ CompressionType ctype;
+ _server_info.get_server_compression_type(_server, ctype);
+ if (ctype == HEADER_COMPRESS_XFEATURE)
+ {
+ // try to enable compression xfeature
+ _socket->write_command (ENABLE_COMPRESS_GZIP, this);
+ state = CMD_NEXT;
+ } else
+ state = CMD_DONE;
break;
case FEATURE_ENABLED:
@@ -368,21 +374,6 @@ NNTP :: xzver (const Quark & group,
write_next_command ();
}
-void
-NNTP :: xfeat (const Quark & group,
- uint64_t low,
- uint64_t high,
- Listener * l)
-{
- _listener = l;
-
- write_next_command();
- _commands.push_back ("XFEATURE COMPRESS GZIP");
- write_next_command();
- xover (group, low, high, l);
-
-}
-
//TODO
void
NNTP :: xover_count_only (const Quark & group,
diff --git a/pan/tasks/nntp.h b/pan/tasks/nntp.h
index e37e7d7..eccab45 100644
--- a/pan/tasks/nntp.h
+++ b/pan/tasks/nntp.h
@@ -76,6 +76,8 @@ namespace
NO_PERMISSION = 502,
FEATURE_NOT_SUPPORTED = 503
};
+
+ const char* EOL = ".";
}
namespace pan
@@ -151,9 +153,11 @@ namespace pan
NNTP (const Quark & server,
const std::string & username,
const std::string & password,
- DownloadMeter & meter,
- Socket * socket):
+ DownloadMeter & meter,
+ ServerInfo & info,
+ Socket * socket):
_server(server),
+ _server_info(info),
_meter(meter),
_socket(socket),
_socket_error(false),
@@ -207,11 +211,6 @@ namespace pan
uint64_t high,
Listener * l);
- void xfeat (const Quark & group,
- uint64_t low,
- uint64_t high,
- Listener * l);
-
/**
* Executes an XOVER command: "XOVER" to count
* the xover numbers internally
@@ -321,6 +320,7 @@ namespace pan
public:
const Quark _server;
+ ServerInfo& _server_info;
Quark _group;
Socket * _socket;
DownloadMeter& _meter;
diff --git a/pan/tasks/socket-impl-gio.cc b/pan/tasks/socket-impl-gio.cc
index b81c0cb..b11e926 100644
--- a/pan/tasks/socket-impl-gio.cc
+++ b/pan/tasks/socket-impl-gio.cc
@@ -334,12 +334,14 @@ GIOChannelSocket :: do_read ()
if (status == G_IO_STATUS_NORMAL)
{
g_string_prepend_len (g, _partial_read.c_str(), _partial_read.size());
+ //TODO validate!
+ _partial_read.clear ();
debug_v ("read [" << g->str << "]"); // verbose debug, if --debug --debug was on the command-line
increment_xfer_byte_count (g->len);
- if (g_str_has_suffix (g->str, "\r\n"))
- g_string_truncate (g, g->len-2);
+ //if (g_str_has_suffix (g->str, "\r\n"))
+ // g_string_truncate (g, g->len-2);
more = _listener->on_socket_response (this, StringView (g->str, g->len));
_listener->on_socket_bytes_transferred(g->len, this);
diff --git a/pan/tasks/socket-impl-openssl.cc b/pan/tasks/socket-impl-openssl.cc
index 229788e..2b33479 100644
--- a/pan/tasks/socket-impl-openssl.cc
+++ b/pan/tasks/socket-impl-openssl.cc
@@ -623,8 +623,8 @@ GIOChannelSocketGnuTLS :: do_read ()
debug_v ("read [" << g->str << "]");
increment_xfer_byte_count (g->len);
- if (g_str_has_suffix (g->str, "\r\n"))
- g_string_truncate (g, g->len-2);
+ //if (g_str_has_suffix (g->str, "\r\n"))
+ // g_string_truncate (g, g->len-2);
more = _listener->on_socket_response (this, StringView (g->str, g->len));
//_listener->on_socket_bytes_transferred(g->len, this);
}
diff --git a/pan/tasks/task-groups.cc b/pan/tasks/task-groups.cc
index 0bc8be7..b8765b0 100644
--- a/pan/tasks/task-groups.cc
+++ b/pan/tasks/task-groups.cc
@@ -80,7 +80,7 @@ TaskGroups :: on_nntp_line (NNTP * nntp,
const StringView & line)
{
if (nntp->_compression)
- stream << line;
+ stream<<line;
else on_nntp_line_process (nntp, line);
}
void
@@ -148,15 +148,16 @@ TaskGroups :: on_nntp_done (NNTP * nntp,
}
else // health is OK or FAIL
{
- if (response == "COMPRESS_DONE")
+
+ if (response == EOL)
{
std::vector<std::string> lines;
compression::inflate_gzip (&stream, lines);
foreach (std::vector<std::string>, lines, it)
on_nntp_line_process (nntp, *it);
- std::cerr<<"len "<<stream.str().length()<<"\n";
}
+
if (_step == LIST_NEWSGROUPS)
{
int i (0);
diff --git a/pan/tasks/task-xover.cc b/pan/tasks/task-xover.cc
index 912fa3b..8bd7491 100644
--- a/pan/tasks/task-xover.cc
+++ b/pan/tasks/task-xover.cc
@@ -289,24 +289,6 @@ namespace
}
}
-/*
- http://tools.ietf.org/html/rfc2980#section-2.8
-
- Each line of output will be formatted with the article number,
- followed by each of the headers in the overview database or the
- article itself (when the data is not available in the overview
- database) for that article separated by a tab character. The
- sequence of fields must be in this order: subject, author, date,
- message-id, references, byte count, and line count. Other optional
- fields may follow line count. Other optional fields may follow line
- count. These fields are specified by examining the response to the
- LIST OVERVIEW.FMT command. Where no data exists, a null field must
- be provided (i.e. the output will have two tab characters adjacent to
- each other). Servers should not output fields for articles that have
- been removed since the XOVER database was created.
-
- */
-
void
TaskXOver::on_nntp_line(NNTP * nntp, const StringView & line)
{
@@ -320,7 +302,7 @@ TaskXOver::on_nntp_line(NNTP * nntp, const StringView & line)
int sock_id = nntp->_socket->get_id();
if (_streams.count(sock_id) == 0)
_streams[sock_id] = new std::stringstream();
- *_streams[sock_id] << line << "\r\n";
+ *_streams[sock_id] << line;
}
else
on_nntp_line_process(nntp, line);
@@ -453,26 +435,28 @@ TaskXOver::on_nntp_done(NNTP * nntp, Health health, const StringView & response)
const Quark& servername(nntp->_server);
CompressionType comp;
_data.get_server_compression_type(servername, comp);
- const bool compression_enabled(comp != HEADER_COMPRESS_NONE);
+ const bool compression_enabled(comp == HEADER_COMPRESS_XZVER || comp == HEADER_COMPRESS_DIABLO || comp == HEADER_COMPRESS_XFEATURE);
- if (response == "." && compression_enabled)
+ if (response == EOL && compression_enabled)
+ {
+ std::stringstream* buffer = _streams[nntp->_socket->get_id()];
+ std::stringstream out, out2;
+ if (comp == HEADER_COMPRESS_XZVER || comp == HEADER_COMPRESS_DIABLO )
{
- std::stringstream* buffer = _streams[nntp->_socket->get_id()];
- std::stringstream out, out2;
- if (comp == HEADER_COMPRESS_XZVER || comp == HEADER_COMPRESS_DIABLO)
- {
compression::ydecode(buffer, &out);
compression::inflate_zlib(&out, &out2, comp);
- }
- else
- compression::inflate_zlib(buffer, &out2, comp);
-
- char buf1[4096];
- while (!out2.getline(buf1, sizeof(buf1)).eof())
- {
- on_nntp_line_process(nntp, buf1);
- }
}
+ else
+ {
+ compression::inflate_zlib(buffer, &out2, comp);
+ }
+
+ buffer->clear();
+
+ char buf1[4096];
+ while (!out2.getline(buf1, sizeof(buf1)).eof())
+ on_nntp_line_process(nntp, buf1);
+ }
update_work(true);
check_in(nntp, health);
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]