diff options
author | Lennart Poettering <lennart@poettering.net> | 2016-11-15 19:32:50 +0100 |
---|---|---|
committer | Lennart Poettering <lennart@poettering.net> | 2016-11-16 15:03:26 +0100 |
commit | c5a97ed132b400ad82f7939d55fe1027a2b13f6e (patch) | |
tree | 924df71be2f04a66a30faccc54354d34c94f42ff | |
parent | 1a465207ab0a0b6756ab0d9305102d9159955a14 (diff) |
core: GC redundant device jobs from the run queue
In contrast to all other unit types device units when queued just track
external state, they cannot effect state changes on their own. Hence unless a
client or other job waits for them there's no reason to keep them in the job
queue. This adds a concept of GC'ing jobs of this type as soon as no client or
other job waits for them anymore.
To ensure this works correctly we need to track which clients actually
reference a job (i.e. which ones enqueued it). Unfortunately that's pretty
nasty to do for direct connections, as sd_bus_track doesn't work for
them. For now, work around this, by simply remembering in a boolean that a job
was requested by a direct connection, and reset it when we notice the direct
connection is gone. This means the GC logic works fine, except that jobs are
not immediately removed when direct connections disconnect.
In the longer term, a rework of the bus logic should fix this properly. For now
this should be good enough, as GC works for fine all cases except this one, and
thus is a clear improvement over the previous behaviour.
Fixes: #1921
-rw-r--r-- | src/core/dbus-job.c | 68 | ||||
-rw-r--r-- | src/core/dbus-job.h | 3 | ||||
-rw-r--r-- | src/core/dbus-unit.c | 25 | ||||
-rw-r--r-- | src/core/device.c | 2 | ||||
-rw-r--r-- | src/core/job.c | 114 | ||||
-rw-r--r-- | src/core/job.h | 7 | ||||
-rw-r--r-- | src/core/manager.c | 41 | ||||
-rw-r--r-- | src/core/manager.h | 5 | ||||
-rw-r--r-- | src/core/unit.c | 4 | ||||
-rw-r--r-- | src/core/unit.h | 3 | ||||
-rw-r--r-- | src/shared/bus-unit-util.c | 4 |
11 files changed, 244 insertions, 32 deletions
diff --git a/src/core/dbus-job.c b/src/core/dbus-job.c index e8c69ed3e4..7888c163f1 100644 --- a/src/core/dbus-job.c +++ b/src/core/dbus-job.c @@ -191,3 +191,71 @@ void bus_job_send_removed_signal(Job *j) { if (r < 0) log_debug_errno(r, "Failed to send job remove signal for %u: %m", j->id); } + +static int bus_job_track_handler(sd_bus_track *t, void *userdata) { + Job *j = userdata; + + assert(t); + assert(j); + + j->bus_track = sd_bus_track_unref(j->bus_track); /* make sure we aren't called again */ + + /* Last client dropped off the bus, maybe we should GC this now? */ + job_add_to_gc_queue(j); + return 0; +} + +static int bus_job_allocate_bus_track(Job *j) { + int r; + + assert(j); + + if (j->bus_track) + return 0; + + r = sd_bus_track_new(j->unit->manager->api_bus, &j->bus_track, bus_job_track_handler, j); + if (r < 0) + return r; + + return 0; +} + +int bus_job_coldplug_bus_track(Job *j) { + int r = 0; + + assert(j); + + if (strv_isempty(j->deserialized_clients)) + goto finish; + + if (!j->manager->api_bus) + goto finish; + + r = bus_job_allocate_bus_track(j); + if (r < 0) + goto finish; + + r = bus_track_add_name_many(j->bus_track, j->deserialized_clients); + +finish: + j->deserialized_clients = strv_free(j->deserialized_clients); + return r; +} + +int bus_job_track_sender(Job *j, sd_bus_message *m) { + int r; + + assert(j); + assert(m); + + if (sd_bus_message_get_bus(m) != j->unit->manager->api_bus) { + j->ref_by_private_bus = true; + return 0; + } + + r = bus_job_allocate_bus_track(j); + if (r < 0) + return r; + + return sd_bus_track_add_sender(j->bus_track, m); +} diff --git a/src/core/dbus-job.h b/src/core/dbus-job.h index 024d06719e..f9148895be 100644 --- a/src/core/dbus-job.h +++ b/src/core/dbus-job.h @@ -29,3 +29,6 @@ int bus_job_method_cancel(sd_bus_message *message, void *job, sd_bus_error *erro void bus_job_send_change_signal(Job *j); void bus_job_send_removed_signal(Job *j); + +int bus_job_coldplug_bus_track(Job *j); +int bus_job_track_sender(Job *j, sd_bus_message *m); diff --git a/src/core/dbus-unit.c b/src/core/dbus-unit.c index 90cf5651ca..2adc1d9288 100644 --- a/src/core/dbus-unit.c +++ b/src/core/dbus-unit.c @@ -22,6 +22,7 @@ #include "alloc-util.h" #include "bus-common-errors.h" #include "cgroup-util.h" +#include "dbus-job.h" #include "dbus-unit.h" #include "dbus.h" #include "fd-util.h" @@ -1223,17 +1224,9 @@ int bus_unit_queue_job( if (r < 0) return r; - if (sd_bus_message_get_bus(message) == u->manager->api_bus) { - if (!j->bus_track) { - r = sd_bus_track_new(sd_bus_message_get_bus(message), &j->bus_track, NULL, NULL); - if (r < 0) - return r; - } - - r = sd_bus_track_add_sender(j->bus_track, message); - if (r < 0) - return r; - } + r = bus_job_track_sender(j, message); + if (r < 0) + return r; path = job_dbus_path(j); if (!path) @@ -1507,7 +1500,7 @@ int bus_unit_check_load_state(Unit *u, sd_bus_error *error) { return sd_bus_error_set_errnof(error, u->load_error, "Unit %s is not loaded properly: %m.", u->id); } -static int bus_track_handler(sd_bus_track *t, void *userdata) { +static int bus_unit_track_handler(sd_bus_track *t, void *userdata) { Unit *u = userdata; assert(t); @@ -1519,7 +1512,7 @@ static int bus_track_handler(sd_bus_track *t, void *userdata) { return 0; } -static int allocate_bus_track(Unit *u) { +static int bus_unit_allocate_bus_track(Unit *u) { int r; assert(u); @@ -1527,7 +1520,7 @@ static int allocate_bus_track(Unit *u) { if (u->bus_track) return 0; - r = sd_bus_track_new(u->manager->api_bus, &u->bus_track, bus_track_handler, u); + r = sd_bus_track_new(u->manager->api_bus, &u->bus_track, bus_unit_track_handler, u); if (r < 0) return r; @@ -1545,7 +1538,7 @@ int bus_unit_track_add_name(Unit *u, const char *name) { assert(u); - r = allocate_bus_track(u); + r = bus_unit_allocate_bus_track(u); if (r < 0) return r; @@ -1557,7 +1550,7 @@ int bus_unit_track_add_sender(Unit *u, sd_bus_message *m) { assert(u); - r = allocate_bus_track(u); + r = bus_unit_allocate_bus_track(u); if (r < 0) return r; diff --git a/src/core/device.c b/src/core/device.c index c572a6737c..074e93ffe2 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -831,6 +831,8 @@ const UnitVTable device_vtable = { "Device\0" "Install\0", + .gc_jobs = true, + .init = device_init, .done = device_done, .load = unit_load_fragment_and_dropin_optional, diff --git a/src/core/job.c b/src/core/job.c index 3a20da6d06..d6e71d68ef 100644 --- a/src/core/job.c +++ b/src/core/job.c @@ -90,6 +90,9 @@ void job_free(Job *j) { if (j->in_dbus_queue) LIST_REMOVE(dbus_queue, j->manager->dbus_job_queue, j); + if (j->in_gc_queue) + LIST_REMOVE(gc_queue, j->manager->gc_job_queue, j); + sd_event_source_unref(j->timer_event_source); sd_bus_track_unref(j->bus_track); @@ -226,6 +229,9 @@ Job* job_install(Job *j) { log_unit_debug(j->unit, "Installed new job %s/%s as %u", j->unit->id, job_type_to_string(j->type), (unsigned) j->id); + + job_add_to_gc_queue(j); + return j; } @@ -639,6 +645,7 @@ _pure_ static const char *job_get_status_message_format(Unit *u, JobType t, JobR [JOB_DEPENDENCY] = "Dependency failed for %s.", [JOB_ASSERT] = "Assertion failed for %s.", [JOB_UNSUPPORTED] = "Starting of %s not supported.", + [JOB_COLLECTED] = "Unecessary job for %s was removed.", }; static const char *const generic_finished_stop_job[_JOB_RESULT_MAX] = { [JOB_DONE] = "Stopped %s.", @@ -698,6 +705,7 @@ static void job_print_status_message(Unit *u, JobType t, JobResult result) { [JOB_SKIPPED] = { ANSI_HIGHLIGHT, " INFO " }, [JOB_ASSERT] = { ANSI_HIGHLIGHT_YELLOW, "ASSERT" }, [JOB_UNSUPPORTED] = { ANSI_HIGHLIGHT_YELLOW, "UNSUPP" }, + [JOB_COLLECTED] = { ANSI_HIGHLIGHT, " INFO " }, }; const char *format; @@ -749,6 +757,7 @@ static void job_log_status_message(Unit *u, JobType t, JobResult result) { [JOB_INVALID] = LOG_INFO, [JOB_ASSERT] = LOG_WARNING, [JOB_UNSUPPORTED] = LOG_WARNING, + [JOB_COLLECTED] = LOG_INFO, }; assert(u); @@ -860,6 +869,7 @@ int job_finish_and_invalidate(Job *j, JobResult result, bool recursive, bool alr job_set_state(j, JOB_WAITING); job_add_to_run_queue(j); + job_add_to_gc_queue(j); goto finish; } @@ -903,11 +913,15 @@ int job_finish_and_invalidate(Job *j, JobResult result, bool recursive, bool alr finish: /* Try to start the next jobs that can be started */ SET_FOREACH(other, u->dependencies[UNIT_AFTER], i) - if (other->job) + if (other->job) { job_add_to_run_queue(other->job); + job_add_to_gc_queue(other->job); + } SET_FOREACH(other, u->dependencies[UNIT_BEFORE], i) - if (other->job) + if (other->job) { job_add_to_run_queue(other->job); + job_add_to_gc_queue(other->job); + } manager_check_finished(u->manager); @@ -1121,12 +1135,14 @@ int job_coldplug(Job *j) { /* After deserialization is complete and the bus connection * set up again, let's start watching our subscribers again */ - (void) bus_track_coldplug(j->manager, &j->bus_track, false, j->deserialized_clients); - j->deserialized_clients = strv_free(j->deserialized_clients); + (void) bus_job_coldplug_bus_track(j); if (j->state == JOB_WAITING) job_add_to_run_queue(j); + /* Maybe due to new dependencies we don't actually need this job anymore? */ + job_add_to_gc_queue(j); + if (j->begin_usec == 0 || j->unit->job_timeout == USEC_INFINITY) return 0; @@ -1201,6 +1217,95 @@ int job_get_timeout(Job *j, usec_t *timeout) { return 1; } +bool job_check_gc(Job *j) { + Unit *other; + Iterator i; + + assert(j); + + /* Checks whether this job should be GC'ed away. We only do this for jobs of units that have no effect on their + * own and just track external state. For now the only unit type that qualifies for this are .device units. */ + + if (!UNIT_VTABLE(j->unit)->gc_jobs) + return true; + + if (sd_bus_track_count(j->bus_track) > 0) + return true; + + /* FIXME: So this is a bit ugly: for now we don't properly track references made via private bus connections + * (because it's nasty, as sd_bus_track doesn't apply to it). We simply remember that the job was once + * referenced by one, and reset this whenever we notice that no private bus connections are around. This means + * the GC is a bit too conservative when it comes to jobs created by private bus connections. */ + if (j->ref_by_private_bus) { + if (set_isempty(j->unit->manager->private_buses)) + j->ref_by_private_bus = false; + else + return true; + } + + if (j->type == JOB_NOP) + return true; + + /* If a job is ordered after ours, and is to be started, then it needs to wait for us, regardless if we stop or + * start, hence let's not GC in that case. */ + SET_FOREACH(other, j->unit->dependencies[UNIT_BEFORE], i) { + if (!other->job) + continue; + + if (other->job->ignore_order) + continue; + + if (IN_SET(other->job->type, JOB_START, JOB_VERIFY_ACTIVE, JOB_RELOAD)) + return true; + } + + /* If we are going down, but something else is orederd After= us, then it needs to wait for us */ + if (IN_SET(j->type, JOB_STOP, JOB_RESTART)) { + + SET_FOREACH(other, j->unit->dependencies[UNIT_AFTER], i) { + if (!other->job) + continue; + + if (other->job->ignore_order) + continue; + + return true; + } + } + + /* The logic above is kinda the inverse of the job_is_runnable() logic. Specifically, if the job "we" is + * ordered before the job "other": + * + * we start + other start → stay + * we start + other stop → gc + * we stop + other start → stay + * we stop + other stop → gc + * + * "we" are ordered after "other": + * + * we start + other start → gc + * we start + other stop → gc + * we stop + other start → stay + * we stop + other stop → stay + * + */ + + return false; +} + +void job_add_to_gc_queue(Job *j) { + assert(j); + + if (j->in_gc_queue) + return; + + if (job_check_gc(j)) + return; + + LIST_PREPEND(gc_queue, j->unit->manager->gc_job_queue, j); + j->in_gc_queue = true; +} + static const char* const job_state_table[_JOB_STATE_MAX] = { [JOB_WAITING] = "waiting", [JOB_RUNNING] = "running", @@ -1244,6 +1349,7 @@ static const char* const job_result_table[_JOB_RESULT_MAX] = { [JOB_INVALID] = "invalid", [JOB_ASSERT] = "assert", [JOB_UNSUPPORTED] = "unsupported", + [JOB_COLLECTED] = "collected", }; DEFINE_STRING_TABLE_LOOKUP(job_result, JobResult); diff --git a/src/core/job.h b/src/core/job.h index ccfc7def4d..6fdec9f226 100644 --- a/src/core/job.h +++ b/src/core/job.h @@ -107,6 +107,7 @@ enum JobResult { JOB_INVALID, /* JOB_RELOAD of inactive unit */ JOB_ASSERT, /* Couldn't start a unit, because an assert didn't hold */ JOB_UNSUPPORTED, /* Couldn't start a unit, because the unit type is not supported on the system */ + JOB_COLLECTED, /* Job was garbage collected, since nothing needed it anymore */ _JOB_RESULT_MAX, _JOB_RESULT_INVALID = -1 }; @@ -133,6 +134,7 @@ struct Job { LIST_FIELDS(Job, transaction); LIST_FIELDS(Job, run_queue); LIST_FIELDS(Job, dbus_queue); + LIST_FIELDS(Job, gc_queue); LIST_HEAD(JobDependency, subject_list); LIST_HEAD(JobDependency, object_list); @@ -168,6 +170,8 @@ struct Job { bool sent_dbus_new_signal:1; bool ignore_order:1; bool irreversible:1; + bool in_gc_queue:1; + bool ref_by_private_bus:1; }; Job* job_new(Unit *unit, JobType type); @@ -227,6 +231,9 @@ void job_shutdown_magic(Job *j); int job_get_timeout(Job *j, usec_t *timeout) _pure_; +bool job_check_gc(Job *j); +void job_add_to_gc_queue(Job *j); + const char* job_type_to_string(JobType t) _const_; JobType job_type_from_string(const char *s) _pure_; diff --git a/src/core/manager.c b/src/core/manager.c index dc81af9492..31770eef3a 100644 --- a/src/core/manager.c +++ b/src/core/manager.c @@ -981,10 +981,9 @@ good: unit_gc_mark_good(u, gc_marker); } -static unsigned manager_dispatch_gc_queue(Manager *m) { +static unsigned manager_dispatch_gc_unit_queue(Manager *m) { + unsigned n = 0, gc_marker; Unit *u; - unsigned n = 0; - unsigned gc_marker; assert(m); @@ -996,12 +995,12 @@ static unsigned manager_dispatch_gc_queue(Manager *m) { gc_marker = m->gc_marker; - while ((u = m->gc_queue)) { + while ((u = m->gc_unit_queue)) { assert(u->in_gc_queue); unit_gc_sweep(u, gc_marker); - LIST_REMOVE(gc_queue, m->gc_queue, u); + LIST_REMOVE(gc_queue, m->gc_unit_queue, u); u->in_gc_queue = false; n++; @@ -1018,6 +1017,30 @@ static unsigned manager_dispatch_gc_queue(Manager *m) { return n; } +static unsigned manager_dispatch_gc_job_queue(Manager *m) { + unsigned n = 0; + Job *j; + + assert(m); + + while ((j = m->gc_job_queue)) { + assert(j->in_gc_queue); + + LIST_REMOVE(gc_queue, m->gc_job_queue, j); + j->in_gc_queue = false; + + n++; + + if (job_check_gc(j)) + continue; + + log_unit_debug(j->unit, "Collecting job."); + (void) job_finish_and_invalidate(j, JOB_COLLECTED, false, false); + } + + return n; +} + static void manager_clear_jobs_and_units(Manager *m) { Unit *u; @@ -1033,7 +1056,8 @@ static void manager_clear_jobs_and_units(Manager *m) { assert(!m->dbus_unit_queue); assert(!m->dbus_job_queue); assert(!m->cleanup_queue); - assert(!m->gc_queue); + assert(!m->gc_unit_queue); + assert(!m->gc_job_queue); assert(hashmap_isempty(m->jobs)); assert(hashmap_isempty(m->units)); @@ -2226,7 +2250,10 @@ int manager_loop(Manager *m) { if (manager_dispatch_load_queue(m) > 0) continue; - if (manager_dispatch_gc_queue(m) > 0) + if (manager_dispatch_gc_job_queue(m) > 0) + continue; + + if (manager_dispatch_gc_unit_queue(m) > 0) continue; if (manager_dispatch_cleanup_queue(m) > 0) diff --git a/src/core/manager.h b/src/core/manager.h index aa3f95e8e0..d54ca54107 100644 --- a/src/core/manager.h +++ b/src/core/manager.h @@ -104,8 +104,9 @@ struct Manager { /* Units to remove */ LIST_HEAD(Unit, cleanup_queue); - /* Units to check when doing GC */ - LIST_HEAD(Unit, gc_queue); + /* Units and jobs to check when doing GC */ + LIST_HEAD(Unit, gc_unit_queue); + LIST_HEAD(Job, gc_job_queue); /* Units that should be realized */ LIST_HEAD(Unit, cgroup_queue); diff --git a/src/core/unit.c b/src/core/unit.c index df60a5bf04..fbb21e4985 100644 --- a/src/core/unit.c +++ b/src/core/unit.c @@ -389,7 +389,7 @@ void unit_add_to_gc_queue(Unit *u) { if (unit_check_gc(u)) return; - LIST_PREPEND(gc_queue, u->manager->gc_queue, u); + LIST_PREPEND(gc_queue, u->manager->gc_unit_queue, u); u->in_gc_queue = true; } @@ -569,7 +569,7 @@ void unit_free(Unit *u) { LIST_REMOVE(cleanup_queue, u->manager->cleanup_queue, u); if (u->in_gc_queue) - LIST_REMOVE(gc_queue, u->manager->gc_queue, u); + LIST_REMOVE(gc_queue, u->manager->gc_unit_queue, u); if (u->in_cgroup_queue) LIST_REMOVE(cgroup_queue, u->manager->cgroup_queue, u); diff --git a/src/core/unit.h b/src/core/unit.h index 991543664b..6d6885b487 100644 --- a/src/core/unit.h +++ b/src/core/unit.h @@ -441,6 +441,9 @@ struct UnitVTable { /* True if transient units of this type are OK */ bool can_transient:1; + + /* True if queued jobs of this type should be GC'ed if no other job needs them anymore */ + bool gc_jobs:1; }; extern const UnitVTable * const unit_vtable[_UNIT_TYPE_MAX]; diff --git a/src/shared/bus-unit-util.c b/src/shared/bus-unit-util.c index 4f66497f3a..a7702602eb 100644 --- a/src/shared/bus-unit-util.c +++ b/src/shared/bus-unit-util.c @@ -838,6 +838,8 @@ static int check_wait_response(BusWaitForJobs *d, bool quiet, const char* const* log_error("Assertion failed on job for %s.", strna(d->name)); else if (streq(d->result, "unsupported")) log_error("Operation on or unit type of %s not supported on this system.", strna(d->name)); + else if (streq(d->result, "collected")) + log_error("Queued job for %s was garbage collected.", strna(d->name)); else if (!streq(d->result, "done") && !streq(d->result, "skipped")) { if (d->name) { int q; @@ -853,7 +855,7 @@ static int check_wait_response(BusWaitForJobs *d, bool quiet, const char* const* } } - if (streq(d->result, "canceled")) + if (STR_IN_SET(d->result, "canceled", "collected")) r = -ECANCELED; else if (streq(d->result, "timeout")) r = -ETIME; |