[pan2: 2/8] intermediate
- From: Heinrich MÃller <henmull src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [pan2: 2/8] intermediate
- Date: Fri, 21 Dec 2012 22:03:04 +0000 (UTC)
commit 3eb66093d8ba118ae2bd2b59f22e213ceff36bc9
Author: Heinrich MÃller <henmull src gnome org>
Date: Wed Oct 3 11:57:49 2012 +0200
intermediate
configure.in | 5 +
pan/data-impl/data-impl.h | 8 ++-
pan/data-impl/server.cc | 23 +++++
pan/data/data.h | 7 ++-
pan/data/server-info.h | 8 ++
pan/general/Makefile.am | 1 +
pan/gui/Makefile.am | 4 +-
pan/gui/gtk-compat.h | 12 +--
pan/gui/server-ui.cc | 75 +++++++++++++++-
pan/tasks/Makefile.am | 6 +-
pan/tasks/decoder.h | 2 +
pan/tasks/nntp.cc | 33 +++++++
pan/tasks/nntp.h | 16 +++-
pan/tasks/queue.cc | 31 ++++++-
pan/tasks/queue.h | 5 +
pan/tasks/socket.cc | 3 +-
pan/tasks/socket.h | 7 ++
pan/tasks/task-xover.cc | 217 ++++++++++++++++++++++++++++++++++++++++-----
pan/tasks/task-xover.h | 36 +++++++-
pan/tasks/task.h | 16 ++--
20 files changed, 468 insertions(+), 47 deletions(-)
---
diff --git a/configure.in b/configure.in
index 2895646..99c6c60 100644
--- a/configure.in
+++ b/configure.in
@@ -88,6 +88,11 @@ AC_DEFINE_UNQUOTED([GETTEXT_PACKAGE],["$GETTEXT_PACKAGE"],[gettext package name]
AM_GLIB_GNU_GETTEXT
panlocaledir='${prefix}/${DATADIRNAME}/locale'
+dnl libz for xzver
+PKG_CHECK_MODULES([ZLIB], [zlib >= 1.2.0],AC_DEFINE(HAVE_ZLIB,[1],[Zlib for xzver support]),[])
+AC_SUBST(ZLIB_CFLAGS)
+AC_SUBST(ZLIB_LIBS)
+
dnl D-Bus support
AC_ARG_WITH(dbus, AS_HELP_STRING([--with-dbus],[enable D-Bus support (normally: no)]), [want_dbus=$withval], [want_dbus=no])
if test "x$want_dbus" = "xyes" ; then
diff --git a/pan/data-impl/data-impl.h b/pan/data-impl/data-impl.h
index c742269..234d24d 100644
--- a/pan/data-impl/data-impl.h
+++ b/pan/data-impl/data-impl.h
@@ -39,7 +39,6 @@
#include <pan/data/article-cache.h>
#include <pan/data/encode-cache.h>
#include <pan/data/data.h>
-
#include <pan/data-impl/data-io.h>
#include <pan/data-impl/article-filter.h>
#include <pan/data-impl/rules-filter.h>
@@ -105,7 +104,6 @@ namespace pan
ArticleCache _cache;
EncodeCache _encode_cache;
CertStore _certstore;
- Prefs _prefs;
Queue* _queue;
public:
@@ -118,6 +116,7 @@ namespace pan
void rebuild_backend ();
const bool _unit_test;
DataIO * _data_io;
+ Prefs& _prefs;
/**
*** SERVERS
@@ -169,6 +168,9 @@ namespace pan
virtual void set_server_article_expiration_age (const Quark & server,
int days);
+ virtual void set_server_compression_type (const Quark & server,
+ const int setme);
+
virtual void save_server_info (const Quark& server);
public: // accessors
@@ -187,6 +189,8 @@ namespace pan
virtual bool get_server_trust (const Quark & servername, int&) const;
+ virtual bool get_server_compression_type (const Quark & servername, CompressionType&) const;
+
virtual bool get_server_addr (const Quark & server,
std::string & setme_host,
int & setme_port) const;
diff --git a/pan/data-impl/server.cc b/pan/data-impl/server.cc
index 40f27c0..6553b1b 100644
--- a/pan/data-impl/server.cc
+++ b/pan/data-impl/server.cc
@@ -157,6 +157,15 @@ DataImpl :: set_server_trust (const Quark & server,
}
void
+DataImpl :: set_server_compression_type (const Quark & server,
+ const int setme)
+{
+ Server * s (find_server (server));
+ assert (s);
+ s->compression_type = setme;
+}
+
+void
DataImpl :: set_server_addr (const Quark & server,
const StringView & host,
int port)
@@ -286,6 +295,18 @@ DataImpl :: get_server_trust (const Quark & server, int& setme) const
}
bool
+DataImpl :: get_server_compression_type (const Quark & server, CompressionType& setme) const
+{
+ const Server * s (find_server (server));
+ const bool found (s);
+ if (found) {
+ setme = get_compression_type(s->compression_type);
+ }
+
+ return found;
+}
+
+bool
DataImpl :: get_server_addr (const Quark & server,
std::string & setme_host,
int & setme_port) const
@@ -481,6 +502,7 @@ DataImpl :: load_server_properties (const DataIO& source)
s.cert = kv["cert"];
int trust(to_int(kv["trust"], 0));
s.trust = trust;
+ s.compression_type = to_int(kv["compression-type"], 0); // NONE
s.newsrc_filename = kv["newsrc"];
if (s.newsrc_filename.empty()) { // set a default filename
std::ostringstream o;
@@ -545,6 +567,7 @@ else
<< indent(depth) << "<rank>" << s->rank << "</rank>\n"
<< indent(depth) << "<use-ssl>" << s->ssl_support << "</use-ssl>\n"
<< indent(depth) << "<trust>" << s->trust << "</trust>\n"
+ << indent(depth) << "<compression-type>" << s->compression_type << "</compression-type>\n"
<< indent(depth) << "<cert>" << s->cert << "</cert>\n";
*out << indent(--depth) << "</server>\n";
diff --git a/pan/data/data.h b/pan/data/data.h
index 684961d..648f0e3 100644
--- a/pan/data/data.h
+++ b/pan/data/data.h
@@ -27,6 +27,7 @@
#include <pan/general/macros.h>
#include <pan/general/quark.h>
+#include <pan/general/compression.h>
#include <pan/general/string-view.h>
#include <pan/usenet-utils/scorefile.h>
#include <pan/data/article.h>
@@ -34,6 +35,7 @@
#include <pan/data/encode-cache.h>
#include <pan/data/cert-store.h>
#include <pan/data/server-info.h>
+
#include <pan/gui/prefs.h>
#include <pan/gui/progress-view.h>
@@ -242,12 +244,13 @@ namespace pan
int rank;
int ssl_support;
int trust;
+ int compression_type;
typedef sorted_vector<Quark,true,AlphabeticalQuarkOrdering> groups_t;
groups_t groups;
gchar* gkr_pw;
Server(): port(STD_NNTP_PORT), article_expiration_age(31), max_connections(2),
- rank(1), ssl_support(0), trust(0), gkr_pw(NULL) {}
+ rank(1), ssl_support(0), trust(0), gkr_pw(NULL), compression_type(0) /* NONE */ {}
};
protected:
@@ -314,6 +317,8 @@ namespace pan
virtual bool get_server_trust (const Quark & servername, int&) const = 0;
+ virtual bool get_server_compression_type (const Quark & servername, CompressionType&) const = 0;
+
virtual std::string get_server_cert (const Quark & server) const = 0;
virtual int get_server_limits (const Quark & server) const = 0;
diff --git a/pan/data/server-info.h b/pan/data/server-info.h
index 7f4f6a9..d0b407d 100644
--- a/pan/data/server-info.h
+++ b/pan/data/server-info.h
@@ -22,10 +22,12 @@
#include <string>
#include <pan/general/quark.h>
+#include <pan/general/compression.h>
#include <pan/general/string-view.h>
namespace pan
{
+
struct ServerRank
{
virtual ~ServerRank () {}
@@ -38,6 +40,7 @@ namespace pan
*/
class ServerInfo: public ServerRank
{
+
public:
virtual ~ServerInfo () {}
@@ -52,6 +55,9 @@ namespace pan
virtual void set_server_trust (const Quark & servername,
const int setme) = 0;
+ virtual void set_server_compression_type (const Quark & servername,
+ const int setme) = 0;
+
virtual void set_server_addr (const Quark & servername,
const StringView & address,
const int port) = 0;
@@ -81,6 +87,8 @@ namespace pan
virtual bool get_server_trust (const Quark & servername, int&) const = 0;
+ virtual bool get_server_compression_type (const Quark & servername, CompressionType&) const = 0;
+
virtual bool get_server_addr (const Quark & servername,
std::string & setme_address,
int & setme_port) const = 0;
diff --git a/pan/general/Makefile.am b/pan/general/Makefile.am
index 1bd5a1d..b23d480 100644
--- a/pan/general/Makefile.am
+++ b/pan/general/Makefile.am
@@ -17,6 +17,7 @@ libgeneralutils_a_SOURCES = \
worker-pool.cc
noinst_HEADERS = \
+ compression.h \
debug.h \
defgroup.h \
e-util.h \
diff --git a/pan/gui/Makefile.am b/pan/gui/Makefile.am
index ca82ce3..135a075 100644
--- a/pan/gui/Makefile.am
+++ b/pan/gui/Makefile.am
@@ -1,5 +1,5 @@
AM_CPPFLAGS = -I top_srcdir@ @GTKSPELL_CFLAGS@ @ENCHANT_CFLAGS@ @GTK_CFLAGS@ @GMIME_CFLAGS@ @GLIB_CFLAGS@ \
- @GNUTLS_CFLAGS@ @LIBNOTIFY_CFLAGS@ @LIBGNOME_KEYRING_1_CFLAGS@ @WEBKITGTK_CFLAGS@ -DPANLOCALEDIR=\""$(panlocaledir)"\"
+ @GNUTLS_CFLAGS@ @LIBNOTIFY_CFLAGS@ @LIBGNOME_KEYRING_1_CFLAGS@ @WEBKITGTK_CFLAGS -DPANLOCALEDIR=\""$(panlocaledir)"\" @ZLIB_CFLAGS@
noinst_LIBRARIES = libpangui.a
@@ -106,7 +106,7 @@ endif
pan_SOURCES = gui.cc pan.cc $(WINRC)
pan_LDADD = ./libpangui.a $(WINRCOBJ) ../data-impl/libpandata.a ../tasks/libtasks.a ../data/libdata.a ../usenet-utils/libusenetutils.a ../general/libgeneralutils.a ../../uulib/libuu.a \
- @GTKSPELL_LIBS@ @ENCHANT_LIBS@ @GTK_LIBS@ @GMIME_LIBS@ @GLIB_LIBS@ @GNUTLS_LIBS@ @LIBNOTIFY_LIBS@ @LIBGNOME_KEYRING_1_LIBS@ @WEBKITGTK_LIBS@
+ @GTKSPELL_LIBS@ @ENCHANT_LIBS@ @GTK_LIBS@ @GMIME_LIBS@ @GLIB_LIBS@ @GNUTLS_LIBS@ @LIBNOTIFY_LIBS@ @LIBGNOME_KEYRING_1_LIBS@ @WEBKITGTK_LIBS@ @ZLIB_LIBS@
if HAVE_WIN32
pan_LDFLAGS = -mwindows
endif
diff --git a/pan/gui/gtk-compat.h b/pan/gui/gtk-compat.h
index 074ba47..030ade6 100644
--- a/pan/gui/gtk-compat.h
+++ b/pan/gui/gtk-compat.h
@@ -78,7 +78,10 @@ static inline GdkWindow * gdk_window_get_device_position (GdkWindow *window,
gdk_window_get_device_position (event->window, event->device, x, y, t);
#endif
}
-
+ ret = gtk_separator_new(GTK_ORIENTATION_VERTICAL);
+#endif
+ return ret;
+ }
#if !GTK_CHECK_VERSION(2,18,0)
static inline void gtk_widget_get_allocation( GtkWidget *w, GtkAllocation *a)
@@ -186,22 +189,15 @@ static inline GdkWindow * gdk_window_get_device_position (GdkWindow *window,
gtk_widget_modify_font(w,f);
}
#endif
-
#if GTK_CHECK_VERSION(3,0,0)
-// include this for conversion of old key names to new
#include <gdk/gdkkeysyms-compat.h>
-
#define GTK_OBJECT(w) w
- typedef GtkWidget GtkObject;
#endif
static inline void cursor_unref(GdkCursor *p)
- {
#if GTK_CHECK_VERSION(3,0,0)
- g_object_unref(p);
#else
gdk_cursor_unref(p);
-#endif
}
#ifdef __cplusplus
diff --git a/pan/gui/server-ui.cc b/pan/gui/server-ui.cc
index eaa00d9..1a7096f 100644
--- a/pan/gui/server-ui.cc
+++ b/pan/gui/server-ui.cc
@@ -69,7 +69,10 @@ namespace
GtkWidget * rank_combo;
GtkWidget * ssl_combo;
GtkWidget * always_trust_checkbox;
+ GtkWidget * compression_checkbox;
ServerEditDialog (Data& d, Queue& q, Prefs& p): data(d), queue(q), prefs(p) {}
+ CompressionType compressiontype;
+
};
void pan_entry_set_text (GtkWidget * w, const StringView& v)
@@ -103,10 +106,58 @@ namespace
GtkTreeIter iter;
if (gtk_combo_box_get_active_iter (w, &iter))
gtk_tree_model_get (gtk_combo_box_get_model(w), &iter, 1, &ssl, -1);
+ if (ssl==0)
+ {
+ gtk_widget_hide(d->always_trust_checkbox);
+ }
+ else
+ {
+ gtk_widget_show(d->always_trust_checkbox);
+ }
pan_spin_button_set (d->port_spin, ssl == 0 ? STD_NNTP_PORT : STD_SSL_PORT);
#endif
}
+ void address_entry_changed_cb (GtkEditable *editable,
+ gpointer gp)
+ {
+ ServerEditDialog* d (static_cast<ServerEditDialog*>(gp));
+ if (!d) return;
+
+ gchar* text = gtk_editable_get_chars (editable, 0, -1);
+ StringView t(text);
+
+ CompressionType type(HEADER_COMPRESS_NONE);
+
+ // 0 == NONE
+
+ if (t.strstr("astraweb")) // 1
+ {
+ type = HEADER_COMPRESS_XZVER;
+ }
+ if (t.strstr("giganews")) // 2
+ {
+ type = HEADER_COMPRESS_XFEATURE;
+ }
+ char* others[] = {"newshosting", "easynews","usenetserver" };
+ for (int i= 0; i < G_N_ELEMENTS(others); i++)
+ {
+ if (t.strstr(others[i]))
+ {
+ type = HEADER_COMPRESS_DIABLO; // 3
+ break;
+ }
+ }
+ d->compressiontype = type;
+
+ if (type != HEADER_COMPRESS_NONE)
+ gtk_widget_show(d->compression_checkbox);
+ else
+ gtk_widget_hide(d->compression_checkbox);
+
+ gtk_toggle_button_set_active (GTK_TOGGLE_BUTTON(d->compression_checkbox), type != HEADER_COMPRESS_NONE);
+ }
+
void
edit_dialog_populate (Data&, Prefs& prefs, const Quark& server, ServerEditDialog * d)
{
@@ -117,6 +168,7 @@ namespace
d->server = server;
int port(STD_NNTP_PORT), max_conn(4), age(31*3), rank(1), ssl(0), trust(0);
+ CompressionType compression(HEADER_COMPRESS_NONE);
std::string addr, user, cert;
gchar* pass(NULL);
if (!server.empty()) {
@@ -128,13 +180,20 @@ namespace
ssl = d->data.get_server_ssl_support(server);
cert = d->data.get_server_cert(server);
d->data.get_server_trust (server, trust);
+ d->data.get_server_compression_type (server, compression);
}
pan_entry_set_text (d->address_entry, addr);
+ pan_spin_button_set (d->port_spin, port);
pan_spin_button_set (d->connection_limit_spin, max_conn);
pan_entry_set_text (d->auth_username_entry, user);
pan_entry_set_text (d->auth_password_entry, pass);
d->cert = cert;
+ d->compressiontype = compression;
+ std::cerr<<compression<<"\n";
+
+ gtk_toggle_button_set_active (GTK_TOGGLE_BUTTON(d->compression_checkbox), d->compressiontype != HEADER_COMPRESS_NONE);
+
// set the age combobox
GtkComboBox * combo (GTK_COMBO_BOX (d->expiration_age_combo));
@@ -222,6 +281,8 @@ namespace
trust = gtk_toggle_button_get_active(GTK_TOGGLE_BUTTON(d->always_trust_checkbox)) ? 1 : 0;
#endif
+ int header_comp = gtk_toggle_button_get_active(GTK_TOGGLE_BUTTON(d->compression_checkbox)) ? 1 : 0;
+
const char * err_msg (0);
if (addr.empty())
err_msg = _("Please specify the server's address.");
@@ -246,7 +307,7 @@ namespace
d->data.set_server_ssl_support(d->server, ssl);
d->data.set_server_cert(d->server,cert);
d->data.set_server_trust(d->server,trust);
-
+ d->data.set_server_compression_type(d->server, header_comp);
d->data.save_server_info(d->server);
d->queue.upkeep ();
@@ -322,6 +383,7 @@ pan :: server_edit_dialog_new (Data& data, Queue& queue, Prefs& prefs, GtkWindow
GtkWidget * w = d->address_entry = gtk_entry_new ();
gtk_widget_set_tooltip_text( w, _("The news server's actual address, e.g. \"news.mynewsserver.com\""));
HIG::workarea_add_row (t, &row, _("_Address:"), w, NULL);
+ g_signal_connect (w, "changed", G_CALLBACK(address_entry_changed_cb), d);
GtkAdjustment * a = GTK_ADJUSTMENT (gtk_adjustment_new (1.0, 1.0, ULONG_MAX, 1.0, 1.0, 0.0));
w = d->port_spin = gtk_spin_button_new (GTK_ADJUSTMENT(a), 1.0, 0u);
@@ -398,6 +460,10 @@ pan :: server_edit_dialog_new (Data& data, Queue& queue, Prefs& prefs, GtkWindow
gtk_widget_set_tooltip_text( e, _("Fallback servers are used for articles that can't be found on the primaries. One common approach is to use free servers as primaries and subscription servers as fallbacks."));
HIG::workarea_add_row (t, &row, e, w);
+ // header compression options
+ d->compression_checkbox = w = gtk_check_button_new_with_label (_("Enable header compression for speedup"));
+ HIG::workarea_add_row (t, &row, e, w);
+
// ssl 3.0 option
#ifdef HAVE_GNUTLS
// select ssl/plaintext
@@ -442,6 +508,13 @@ pan :: server_edit_dialog_new (Data& data, Queue& queue, Prefs& prefs, GtkWindow
d->server = server;
edit_dialog_populate (data, prefs, server, d);
gtk_widget_show_all (d->dialog);
+
+ // perhaps hide compression checkbox
+ if (d->compressiontype==HEADER_COMPRESS_NONE)
+ {
+ gtk_widget_hide(d->compression_checkbox);
+ }
+
return d->dialog;
}
diff --git a/pan/tasks/Makefile.am b/pan/tasks/Makefile.am
index f120e77..edf1e1b 100644
--- a/pan/tasks/Makefile.am
+++ b/pan/tasks/Makefile.am
@@ -1,11 +1,12 @@
-AM_CPPFLAGS = -I top_srcdir@ @GMIME_CFLAGS@ @GLIB_CFLAGS@ @GNUTLS_CFLAGS@ @GTK_CFLAGS@
+AM_CPPFLAGS = -I top_srcdir@ @GMIME_CFLAGS@ @GLIB_CFLAGS@ @GNUTLS_CFLAGS@ @GTK_CFLAGS@ @ZLIB_CFLAGS@
-AM_LDFLAGS = ../../uulib/libuu.a @GNUTLS_LIBS@ @GTK_LIBS@
+AM_LDFLAGS = ../../uulib/libuu.a @GNUTLS_LIBS@ @ZLIB_LIBS@
noinst_LIBRARIES = libtasks.a
libtasks_a_SOURCES = \
decoder.cc \
+ xzver-decoder.cc \
encoder.cc \
task.cc \
task-article.cc \
@@ -29,6 +30,7 @@ noinst_HEADERS = \
adaptable-set.cc \
adaptable-set.h \
decoder.h \
+ xzver-decoder.h \
encoder.h \
defgroup.h \
health.h \
diff --git a/pan/tasks/decoder.h b/pan/tasks/decoder.h
index 0814694..46c8a53 100644
--- a/pan/tasks/decoder.h
+++ b/pan/tasks/decoder.h
@@ -95,6 +95,8 @@ namespace pan
/** tell our task about the decode's progress */
static gboolean progress_update_timer_func(gpointer decoder);
+ protected:
+
WorkerPool& _worker_pool;
int _gsourceid;
void disable_progress_update();
diff --git a/pan/tasks/nntp.cc b/pan/tasks/nntp.cc
index cf426c4..3ed9e16 100644
--- a/pan/tasks/nntp.cc
+++ b/pan/tasks/nntp.cc
@@ -331,6 +331,39 @@ NNTP :: xover (const Quark & group,
}
void
+NNTP :: xzver (const Quark & group,
+ uint64_t low,
+ uint64_t high,
+ Listener * l)
+{
+ _listener = l;
+
+ if (group != _group)
+ _commands.push_back (build_command ("GROUP %s\r\n", group.c_str()));
+
+ _commands.push_back (build_command ("XZVER %"G_GUINT64_FORMAT"-%"G_GUINT64_FORMAT"\r\n", low, high));
+
+ write_next_command ();
+}
+
+void
+NNTP :: xfeat (const Quark & group,
+ uint64_t low,
+ uint64_t high,
+ Listener * l)
+{
+ _listener = l;
+
+ if (group != _group)
+ _commands.push_back (build_command ("GROUP %s\r\n", group.c_str()));
+
+ _commands.push_back ("XFEATURE COMPRESS GZIP");
+ _commands.push_back (build_command ("XOVER %"G_GUINT64_FORMAT"-%"G_GUINT64_FORMAT"\r\n", low, high));
+
+ write_next_command ();
+}
+
+void
NNTP :: xover_count_only (const Quark & group,
Listener * l)
{
diff --git a/pan/tasks/nntp.h b/pan/tasks/nntp.h
index 95b3ea1..c27cd66 100644
--- a/pan/tasks/nntp.h
+++ b/pan/tasks/nntp.h
@@ -197,6 +197,17 @@ namespace pan
uint64_t low,
uint64_t high,
Listener * l);
+
+ void xzver (const Quark & group,
+ uint64_t low,
+ uint64_t high,
+ Listener * l);
+
+ void xfeat (const Quark & group,
+ uint64_t low,
+ uint64_t high,
+ Listener * l);
+
/**
* Executes an XOVER command: "XOVER" to count
* the xover numbers internally
@@ -307,10 +318,13 @@ namespace pan
const Quark _server;
Quark _group;
- Quark _request_group;
Socket * _socket;
DownloadMeter& _meter;
bool _socket_error;
+ const std::string& get_username()
+ {
+ return _username;
+ }
protected:
diff --git a/pan/tasks/queue.cc b/pan/tasks/queue.cc
index a9a59c8..3f02c24 100644
--- a/pan/tasks/queue.cc
+++ b/pan/tasks/queue.cc
@@ -49,6 +49,7 @@ Queue :: Queue (ServerInfo & server_info,
_socket_creator (socket_creator),
_worker_pool (pool),
_decoder (pool),
+ _xzver_decoder (pool),
_encoder (pool),
_decoder_task (0),
_encoder_task (0),
@@ -236,6 +237,16 @@ Queue :: give_task_a_decoder (Task * task)
}
void
+Queue :: give_task_a_xzver_decoder (Task * task)
+{
+ const bool was_active (task_is_active (task));
+ _decoder_task = task;
+ if (!was_active)
+ fire_task_active_changed (task, true);
+ task->give_decoder (this, static_cast<Decoder*>(&_xzver_decoder)); // it's active now...
+}
+
+void
Queue :: give_task_a_encoder (Task * task)
{
const bool was_active (task_is_active (task));
@@ -333,10 +344,14 @@ Queue :: process_task (Task * task)
}
else if (state._work == Task::NEED_DECODER)
{
-
if (!_decoder_task)
give_task_a_decoder (task);
}
+ else if (state._work == Task::NEED_XZVER_DECODER)
+ {
+ if (!_decoder_task)
+ give_task_a_xzver_decoder (task);
+ }
else if (state._work == Task::NEED_ENCODER)
{
if (!_encoder_task)
@@ -391,6 +406,20 @@ Queue :: find_first_task_needing_decoder ()
}
Task*
+Queue :: find_first_task_needing_xzver_decoder ()
+{
+ foreach (TaskSet, _tasks, it) {
+ const Task::State& state ((*it)->get_state ());
+ if ((state._work == Task::NEED_XZVER_DECODER)
+ && (!_stopped.count (*it))
+ && (!_removing.count (*it)))
+ return *it;
+ }
+
+ return 0;
+}
+
+Task*
Queue :: find_first_task_needing_encoder ()
{
foreach (TaskSet, _tasks, it) {
diff --git a/pan/tasks/queue.h b/pan/tasks/queue.h
index de3be6b..9e23824 100644
--- a/pan/tasks/queue.h
+++ b/pan/tasks/queue.h
@@ -26,6 +26,7 @@
#include <pan/general/macros.h> // for UNUSED
#include <pan/general/map-vector.h>
#include <pan/tasks/decoder.h>
+#include <pan/tasks/xzver-decoder.h>
#include <pan/tasks/encoder.h>
#include <pan/general/quark.h>
#include <pan/tasks/nntp-pool.h>
@@ -49,6 +50,7 @@ namespace pan
class WorkerPool;
struct Encoder;
struct Decoder;
+ struct XZVERDecoder;
/**
* A Queue helper that saves tasks to disk and restores them from disk.
@@ -190,12 +192,14 @@ namespace pan
protected:
void process_task (Task *);
void give_task_a_decoder (Task*);
+ void give_task_a_xzver_decoder (Task*);
void give_task_a_encoder (Task*);
void give_task_a_connection (Task*, NNTP*);
ServerInfo& _server_info;
bool _is_online;
Task* find_first_task_needing_server (const Quark& server);
Task* find_first_task_needing_decoder ();
+ Task* find_first_task_needing_xzver_decoder ();
Task* find_first_task_needing_encoder ();
void give_task_an_upload_slot (TaskUpload* task);
@@ -214,6 +218,7 @@ namespace pan
SocketCreator * _socket_creator;
WorkerPool & _worker_pool;
Decoder _decoder;
+ XZVERDecoder _xzver_decoder;
Encoder _encoder;
Task * _decoder_task;
Task * _encoder_task;
diff --git a/pan/tasks/socket.cc b/pan/tasks/socket.cc
index 2cd12db..3b011ac 100644
--- a/pan/tasks/socket.cc
+++ b/pan/tasks/socket.cc
@@ -34,7 +34,8 @@ Socket :: Socket ():
_bytes_since_last_check (0),
_time_of_last_check (time(0)),
_speed_KiBps (0.0),
- _abort_flag (false)
+ _abort_flag (false),
+ _stream (new std::stringstream())
{
}
diff --git a/pan/tasks/socket.h b/pan/tasks/socket.h
index af26a62..91c8160 100644
--- a/pan/tasks/socket.h
+++ b/pan/tasks/socket.h
@@ -31,6 +31,8 @@ extern "C" {
#include <gnutls/gnutls.h>
#endif
+#include <sstream>
+
namespace pan
{
class StringView;
@@ -61,6 +63,10 @@ namespace pan
virtual void on_socket_bytes_transferred (uint64_t bytes, Socket*) = 0;
};
+ void write(const std::string& str) { *_stream << str; }
+ void clear() { (*_stream).clear(); }
+ std::stringstream*& get_stream() { return _stream; }
+
public:
virtual bool open (const StringView& address, int port, std::string& setme_err) = 0;
virtual void write_command (const StringView& chars, Listener *) = 0;
@@ -79,6 +85,7 @@ namespace pan
mutable time_t _time_of_last_check;
mutable double _speed_KiBps;
bool _abort_flag;
+ std::stringstream* _stream;
public:
diff --git a/pan/tasks/task-xover.cc b/pan/tasks/task-xover.cc
index 483bf38..93d88cb 100644
--- a/pan/tasks/task-xover.cc
+++ b/pan/tasks/task-xover.cc
@@ -32,6 +32,7 @@ extern "C" {
#include <fstream>
#include <iostream>
+#include <pan/general/log.h>
#include <pan/general/debug.h>
#include <pan/general/file-util.h>
#include <pan/general/macros.h>
@@ -40,12 +41,13 @@ extern "C" {
#include <pan/data/data.h>
#include "nntp.h"
#include "task-xover.h"
-
+#include "xzver-decoder.h"
using namespace pan;
namespace
{
+
std::string
get_short_name (const StringView& in)
{
@@ -87,6 +89,16 @@ namespace
}
}
+namespace
+{
+ char* build_cachename (char* buf, size_t len, const char* name)
+ {
+ const char * home(file::get_pan_home().c_str());
+ g_snprintf(buf,len,"%s%c%s%c%s",home, G_DIR_SEPARATOR, "encode-cache", G_DIR_SEPARATOR, name);
+ return buf;
+ }
+}
+
TaskXOver :: TaskXOver (Data & data,
const Quark & group,
Mode mode,
@@ -102,9 +114,14 @@ TaskXOver :: TaskXOver (Data & data,
_bytes_so_far (0),
_parts_so_far (0ul),
_articles_so_far (0ul),
- _total_minitasks (0)
+ _total_minitasks (0),
+ _running_minitasks (0),
+ _decoder(0),
+ _decoder_has_run (false)
{
+
+
debug ("ctor for " << group);
// add a ``GROUP'' MiniTask for each server that has this group
@@ -126,6 +143,12 @@ TaskXOver :: TaskXOver (Data & data,
update_work ();
}
+void
+TaskXOver :: setHigh(const Quark& server, uint64_t& h)
+{
+ _high[server] = h;
+}
+
TaskXOver :: ~TaskXOver ()
{
if (_group_xover_is_reffed) {
@@ -133,6 +156,9 @@ TaskXOver :: ~TaskXOver ()
_data.set_xover_high (_group, it->first, it->second);
_data.xover_unref (_group);
}
+ if (_decoder)
+ _decoder->cancel_silently();
+
_data.fire_group_entered(_group, 1, 0);
}
@@ -141,6 +167,10 @@ TaskXOver :: use_nntp (NNTP* nntp)
{
const Quark& server (nntp->_server);
debug ("got an nntp from " << nntp->_server);
+ CompressionType type;
+ _data.get_server_compression_type(nntp->_server, type);
+
+ _compression_enabled = type != HEADER_COMPRESS_NONE;
// if this is the first nntp we've gotten, ref the xover data
if (!_group_xover_is_reffed) {
@@ -151,7 +181,7 @@ TaskXOver :: use_nntp (NNTP* nntp)
MiniTasks_t& minitasks (_server_to_minitasks[server]);
if (minitasks.empty())
{
- debug ("That's interesting, I got a socket for " << server << " but have no use for it!");
+ debug ("That's interesting, I got a socket for " << server << " but _have no use for it!");
_state._servers.erase (server);
check_in (nntp, OK);
}
@@ -168,7 +198,18 @@ TaskXOver :: use_nntp (NNTP* nntp)
case MiniTask::XOVER:
debug ("XOVER " << mt._low << '-' << mt._high << " to " << server);
_last_xover_number[nntp] = mt._low;
- nntp->xover (_group, mt._low, mt._high, this);
+ if (_compression_enabled)
+ {
+// TODO support XFEATURE!
+// if (type == XZVER)
+ nntp->xzver (_group, mt._low, mt._high, this);
+// if (type == XFEATURE)
+// nntp->xfeat (_group, mt._low, mt._high, this);
+ // TODO diablo
+ }
+ else
+ nntp->xover (_group, mt._low, mt._high, this);
+ --_running_minitasks;
break;
default:
assert (0);
@@ -181,7 +222,6 @@ TaskXOver :: use_nntp (NNTP* nntp)
****
***/
-///TODO show low and high in UI (is this already there?)
void
TaskXOver :: on_nntp_group (NNTP * nntp,
const Quark & group,
@@ -221,15 +261,32 @@ TaskXOver :: on_nntp_group (NNTP * nntp,
if (l <= high)
{
- //std::cerr << LINE_ID << " okay, I'll try to get articles in [" << l << "..." << h << ']' << std::endl;
+// std::cerr << LINE_ID << " okay, I'll try to get articles in [" << l << "..." << h << ']' << std::endl;
+
add_steps (h-l);
- const int INCREMENT(1000);
+
MiniTasks_t& minitasks (_server_to_minitasks[servername]);
- for (uint64_t m=l; m<=h; m+=INCREMENT) {
- MiniTask mt (MiniTask::XOVER, m, m+INCREMENT);
- debug ("adding MiniTask for " << servername << ": xover [" << mt._low << '-' << mt._high << "]");
- minitasks.push_front (mt);
- ++_total_minitasks;
+ if (_compression_enabled)
+ {
+ const int INCREMENT(100000);
+ MiniTasks_t& minitasks (_server_to_minitasks[servername]);
+ for (uint64_t m=l; m<=h; m+=INCREMENT) {
+ MiniTask mt (MiniTask::XOVER, m, m+INCREMENT);
+ debug ("adding MiniTask for " << servername << ": xover [" << mt._low << '-' << mt._high << "]");
+ minitasks.push_front (mt);
+ ++_total_minitasks;
+ }
+ }
+ else
+ {
+ const int INCREMENT(1000);
+ MiniTasks_t& minitasks (_server_to_minitasks[servername]);
+ for (uint64_t m=l; m<=h; m+=INCREMENT) {
+ MiniTask mt (MiniTask::XOVER, m, m+INCREMENT);
+ debug ("adding MiniTask for " << servername << ": xover [" << mt._low << '-' << mt._high << "]");
+ minitasks.push_front (mt);
+ ++_total_minitasks;
+ }
}
}
else
@@ -237,6 +294,7 @@ TaskXOver :: on_nntp_group (NNTP * nntp,
//std::cerr << LINE_ID << " nothing new here..." << std::endl;
_high[nntp->_server] = high;
}
+ _running_minitasks = _total_minitasks;
}
namespace
@@ -298,6 +356,31 @@ void
TaskXOver :: on_nntp_line (NNTP * nntp,
const StringView & line)
{
+ if (_compression_enabled) {
+
+ std::string l(line.str);
+
+ if (line.strstr("=ybegin line=128"))
+ {
+ l += " name=xzver_decoded\n";
+ nntp->_socket->write(l);
+ }
+ else
+ {
+ l += "\n";
+ nntp->_socket->write(l);
+ }
+ }
+ else
+ {
+ on_nntp_line_process (nntp, line);
+ }
+}
+
+void
+TaskXOver :: on_nntp_line_process (NNTP * nntp,
+ const StringView & line)
+{
pan_return_if_fail (nntp != 0);
pan_return_if_fail (!nntp->_server.empty());
@@ -319,6 +402,7 @@ TaskXOver :: on_nntp_line (NNTP * nntp,
//handle multiple "References:"-message-ids correctly. (hack for some faulty servers)
ok = ok && l.pop_token (tmp, '\t');
+ ref += tmp;
do
{
// usenetbucket uses a (null) (sic!) value for an empty reference list. hence the following hack
@@ -359,9 +443,6 @@ TaskXOver :: on_nntp_line (NNTP * nntp,
nntp->_group.c_str(),
number);
- uint64_t& h (_high[nntp->_server]);
- h = std::max (h, number);
-
const char * fallback_charset = NULL; // FIXME
// are we done?
@@ -382,8 +463,11 @@ TaskXOver :: on_nntp_line (NNTP * nntp,
if (article)
++_articles_so_far;
- // emit a status update
+// emit a status update
uint64_t& prev = _last_xover_number[nntp];
+ uint64_t& h (_high[nntp->_server]);
+ h = std::max (h, number);
+
increment_step (number - prev);
prev = number;
if (!(_parts_so_far % 500))
@@ -398,6 +482,15 @@ TaskXOver :: on_nntp_done (NNTP * nntp,
Health health,
const StringView & response UNUSED)
{
+ if (_compression_enabled)
+ {
+ DataStream stream;
+ stream.stream = nntp->_socket->get_stream();
+ stream.group = nntp->_group;
+ stream.server = nntp->_server;
+ _data_streams.push_back(stream);
+ }
+
update_work (true);
check_in (nntp, health);
}
@@ -405,6 +498,7 @@ TaskXOver :: on_nntp_done (NNTP * nntp,
void
TaskXOver :: update_work (bool subtract_one_from_nntp_count)
{
+
int nntp_count (get_nntp_count ());
if (subtract_one_from_nntp_count)
--nntp_count;
@@ -415,16 +509,50 @@ TaskXOver :: update_work (bool subtract_one_from_nntp_count)
if (!it->second.empty())
servers.insert (it->first);
- //std::cerr << LINE_ID << " servers: " << servers.size() << " nntp: " << nntp_count << std::endl;
-
if (!servers.empty())
+ {
_state.set_need_nntp (servers);
+ }
else if (nntp_count)
+ {
_state.set_working ();
- else {
- _state.set_completed();
- set_finished(OK);
}
+ else if (_data_streams.size() != 0 || (!_decoder && !_decoder_has_run)) {
+ _state.set_need_xzverdecoder ();
+ } else if (_decoder_has_run) {
+ _state.set_completed();
+ set_finished (OK);
+ } else if (!_compression_enabled)
+ {
+ _state.set_completed();
+ set_finished (OK);
+ } else assert(0 && "hm, missed a state.");
+}
+
+void
+TaskXOver:: use_decoder (Decoder* decoder)
+{
+ if (_state._work != NEED_XZVER_DECODER)
+ check_in (decoder);
+
+ _decoder = static_cast<XZVERDecoder*>(decoder);
+ _state.set_working();
+
+ DataStream* stream = new DataStream();
+ DataStream& ref(_data_streams.back());
+ stream->stream = ref.stream;
+ stream->group = ref.group;
+ stream->server = ref.server;
+ _data_streams.pop_back();
+ _decoder->enqueue (this, stream, &_data);
+ debug ("decoder thread was free, enqueued work");
+}
+
+void
+TaskXOver :: stop ()
+{
+ if (_decoder)
+ _decoder->cancel();
}
unsigned long
@@ -440,3 +568,50 @@ TaskXOver :: get_bytes_remaining () const
const unsigned long total_bytes = (unsigned long)(_bytes_so_far / percent_done);
return total_bytes - _bytes_so_far;
}
+
+
+void
+TaskXOver :: on_worker_done (bool cancelled)
+{
+ assert(_decoder);
+ if (!_decoder) return;
+
+ if (!cancelled)
+ {
+ // the decoder is done... catch up on all housekeeping
+ // now that we're back in the main thread.
+
+ foreach_const(Decoder::log_t, _decoder->log_severe, it)
+ {
+ Log :: add_err(it->c_str());
+ verbose (it->c_str());
+ }
+ foreach_const(Decoder::log_t, _decoder->log_errors, it)
+ {
+ Log :: add_err(it->c_str());
+ verbose (it->c_str());
+ }
+ foreach_const(Decoder::log_t, _decoder->log_infos, it)
+ {
+ Log :: add_info(it->c_str());
+ verbose (it->c_str());
+ }
+
+ if (!_decoder->log_errors.empty())
+ set_error (_decoder->log_errors.front());
+
+ _state.set_health(_decoder->health);
+
+ if (!_decoder->log_severe.empty())
+ _state.set_health (ERR_LOCAL);
+ else {
+ _state.set_completed();
+ _decoder_has_run = true;
+ }
+ }
+
+ Decoder * d (_decoder);
+ _decoder = 0;
+ update_work ();
+ check_in (d);
+}
diff --git a/pan/tasks/task-xover.h b/pan/tasks/task-xover.h
index d1784de..13d4b97 100644
--- a/pan/tasks/task-xover.h
+++ b/pan/tasks/task-xover.h
@@ -23,20 +23,28 @@
#include <map>
#include <vector>
#include <sstream>
+#include <zlib.h>
#include <pan/data/data.h>
#include <pan/tasks/task.h>
#include <pan/tasks/nntp.h>
+#include <pan/general/worker-pool.h>
#include <fstream>
#include <iostream>
namespace pan
{
+
+ struct XZVERDecoder;
+
/**
* Task for downloading a some or all of a newsgroups' headers
* @ingroup tasks
*/
- class TaskXOver: public Task, private NNTP::Listener
+ class TaskXOver: public Task,
+ private WorkerPool::Worker::Listener,
+ private NNTP::Listener
+
{
public: // life cycle
enum Mode { ALL, NEW, SAMPLE, DAYS };
@@ -45,15 +53,21 @@ namespace pan
public: // task subclass
virtual unsigned long get_bytes_remaining () const;
+ virtual void use_decoder (Decoder*);
+ void stop ();
protected: // task subclass
virtual void use_nntp (NNTP * nntp);
private: // NNTP::Listener
+ void on_nntp_line_process (NNTP*, const StringView&);
virtual void on_nntp_line (NNTP*, const StringView&);
virtual void on_nntp_done (NNTP*, Health, const StringView&);
virtual void on_nntp_group (NNTP*, const Quark&, unsigned long, uint64_t, uint64_t);
+ private: // WorkerPool::Listener interface
+ void on_worker_done (bool cancelled);
+
private: // implementation - minitasks
struct MiniTask {
enum Type { GROUP, XOVER };
@@ -67,6 +81,7 @@ namespace pan
server_to_minitasks_t _server_to_minitasks;
private: // implementation
+ void process_headers (NNTP*);
Data& _data;
const Quark _group;
std::string _short_group_name;
@@ -82,8 +97,27 @@ namespace pan
unsigned long _bytes_so_far;
unsigned long _parts_so_far;
unsigned long _articles_so_far;
+ unsigned long _lines_so_far;
unsigned long _total_minitasks;
+ unsigned long _running_minitasks;
+ bool _compression_enabled;
+ CompressionType _compressiontype;
+
+ struct DataStream
+ {
+ std::stringstream* stream;
+ Quark group;
+ Quark server;
+ };
+
+ std::vector<DataStream> _data_streams;
+
+ friend class XZVERDecoder;
+ XZVERDecoder * _decoder;
+ bool _decoder_has_run;
+ public:
+ void setHigh(const Quark& server, uint64_t& h);
};
}
diff --git a/pan/tasks/task.h b/pan/tasks/task.h
index 9fa2968..0618247 100644
--- a/pan/tasks/task.h
+++ b/pan/tasks/task.h
@@ -50,16 +50,17 @@ namespace pan
enum Work
{
/** Task finished successfully */
- COMPLETED,
+ COMPLETED = 0,
/** Task is waiting on an nntp connection */
- NEED_NNTP,
+ NEED_NNTP = 1,
/** Task waiting for a decoder/encoder */
- NEED_DECODER,
- NEED_ENCODER,
+ NEED_DECODER = 2,
+ NEED_ENCODER = 3,
+ NEED_XZVER_DECODER = 4,
/** Task is running */
- WORKING,
+ WORKING = 5,
/** Task is paused, woken up if 'current_connections < max_connections' */
- PAUSED
+ PAUSED = 6
};
/**
@@ -99,6 +100,9 @@ namespace pan
void set_need_decoder () {
_work = NEED_DECODER; _servers.clear(); }
+ void set_need_xzverdecoder () {
+ _work = NEED_XZVER_DECODER; _servers.clear(); }
+
void set_need_encoder () {
_work = NEED_ENCODER; _servers.clear(); }
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]