diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/dbus-job.c | 68 | ||||
-rw-r--r-- | src/core/dbus-job.h | 3 | ||||
-rw-r--r-- | src/core/dbus-unit.c | 25 | ||||
-rw-r--r-- | src/core/device.c | 2 | ||||
-rw-r--r-- | src/core/job.c | 114 | ||||
-rw-r--r-- | src/core/job.h | 7 | ||||
-rw-r--r-- | src/core/manager.c | 41 | ||||
-rw-r--r-- | src/core/manager.h | 5 | ||||
-rw-r--r-- | src/core/unit.c | 4 | ||||
-rw-r--r-- | src/core/unit.h | 3 | ||||
-rw-r--r-- | src/shared/bus-unit-util.c | 4 |
11 files changed, 244 insertions, 32 deletions
diff --git a/src/core/dbus-job.c b/src/core/dbus-job.c index e8c69ed3e4..7888c163f1 100644 --- a/src/core/dbus-job.c +++ b/src/core/dbus-job.c @@ -191,3 +191,71 @@ void bus_job_send_removed_signal(Job *j) { if (r < 0) log_debug_errno(r, "Failed to send job remove signal for %u: %m", j->id); } + +static int bus_job_track_handler(sd_bus_track *t, void *userdata) { + Job *j = userdata; + + assert(t); + assert(j); + + j->bus_track = sd_bus_track_unref(j->bus_track); /* make sure we aren't called again */ + + /* Last client dropped off the bus, maybe we should GC this now? */ + job_add_to_gc_queue(j); + return 0; +} + +static int bus_job_allocate_bus_track(Job *j) { + int r; + + assert(j); + + if (j->bus_track) + return 0; + + r = sd_bus_track_new(j->unit->manager->api_bus, &j->bus_track, bus_job_track_handler, j); + if (r < 0) + return r; + + return 0; +} + +int bus_job_coldplug_bus_track(Job *j) { + int r = 0; + + assert(j); + + if (strv_isempty(j->deserialized_clients)) + goto finish; + + if (!j->manager->api_bus) + goto finish; + + r = bus_job_allocate_bus_track(j); + if (r < 0) + goto finish; + + r = bus_track_add_name_many(j->bus_track, j->deserialized_clients); + +finish: + j->deserialized_clients = strv_free(j->deserialized_clients); + return r; +} + +int bus_job_track_sender(Job *j, sd_bus_message *m) { + int r; + + assert(j); + assert(m); + + if (sd_bus_message_get_bus(m) != j->unit->manager->api_bus) { + j->ref_by_private_bus = true; + return 0; + } + + r = bus_job_allocate_bus_track(j); + if (r < 0) + return r; + + return sd_bus_track_add_sender(j->bus_track, m); +} diff --git a/src/core/dbus-job.h b/src/core/dbus-job.h index 024d06719e..f9148895be 100644 --- a/src/core/dbus-job.h +++ b/src/core/dbus-job.h @@ -29,3 +29,6 @@ int bus_job_method_cancel(sd_bus_message *message, void *job, sd_bus_error *erro void bus_job_send_change_signal(Job *j); void bus_job_send_removed_signal(Job *j); + +int bus_job_coldplug_bus_track(Job *j); +int bus_job_track_sender(Job *j, sd_bus_message *m); diff --git a/src/core/dbus-unit.c b/src/core/dbus-unit.c index 90cf5651ca..2adc1d9288 100644 --- a/src/core/dbus-unit.c +++ b/src/core/dbus-unit.c @@ -22,6 +22,7 @@ #include "alloc-util.h" #include "bus-common-errors.h" #include "cgroup-util.h" +#include "dbus-job.h" #include "dbus-unit.h" #include "dbus.h" #include "fd-util.h" @@ -1223,17 +1224,9 @@ int bus_unit_queue_job( if (r < 0) return r; - if (sd_bus_message_get_bus(message) == u->manager->api_bus) { - if (!j->bus_track) { - r = sd_bus_track_new(sd_bus_message_get_bus(message), &j->bus_track, NULL, NULL); - if (r < 0) - return r; - } - - r = sd_bus_track_add_sender(j->bus_track, message); - if (r < 0) - return r; - } + r = bus_job_track_sender(j, message); + if (r < 0) + return r; path = job_dbus_path(j); if (!path) @@ -1507,7 +1500,7 @@ int bus_unit_check_load_state(Unit *u, sd_bus_error *error) { return sd_bus_error_set_errnof(error, u->load_error, "Unit %s is not loaded properly: %m.", u->id); } -static int bus_track_handler(sd_bus_track *t, void *userdata) { +static int bus_unit_track_handler(sd_bus_track *t, void *userdata) { Unit *u = userdata; assert(t); @@ -1519,7 +1512,7 @@ static int bus_track_handler(sd_bus_track *t, void *userdata) { return 0; } -static int allocate_bus_track(Unit *u) { +static int bus_unit_allocate_bus_track(Unit *u) { int r; assert(u); @@ -1527,7 +1520,7 @@ static int allocate_bus_track(Unit *u) { if (u->bus_track) return 0; - r = sd_bus_track_new(u->manager->api_bus, &u->bus_track, bus_track_handler, u); + r = sd_bus_track_new(u->manager->api_bus, &u->bus_track, bus_unit_track_handler, u); if (r < 0) return r; @@ -1545,7 +1538,7 @@ int bus_unit_track_add_name(Unit *u, const char *name) { assert(u); - r = allocate_bus_track(u); + r = bus_unit_allocate_bus_track(u); if (r < 0) return r; @@ -1557,7 +1550,7 @@ int bus_unit_track_add_sender(Unit *u, sd_bus_message *m) { assert(u); - r = allocate_bus_track(u); + r = bus_unit_allocate_bus_track(u); if (r < 0) return r; diff --git a/src/core/device.c b/src/core/device.c index c572a6737c..074e93ffe2 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -831,6 +831,8 @@ const UnitVTable device_vtable = { "Device\0" "Install\0", + .gc_jobs = true, + .init = device_init, .done = device_done, .load = unit_load_fragment_and_dropin_optional, diff --git a/src/core/job.c b/src/core/job.c index 3a20da6d06..d6e71d68ef 100644 --- a/src/core/job.c +++ b/src/core/job.c @@ -90,6 +90,9 @@ void job_free(Job *j) { if (j->in_dbus_queue) LIST_REMOVE(dbus_queue, j->manager->dbus_job_queue, j); + if (j->in_gc_queue) + LIST_REMOVE(gc_queue, j->manager->gc_job_queue, j); + sd_event_source_unref(j->timer_event_source); sd_bus_track_unref(j->bus_track); @@ -226,6 +229,9 @@ Job* job_install(Job *j) { log_unit_debug(j->unit, "Installed new job %s/%s as %u", j->unit->id, job_type_to_string(j->type), (unsigned) j->id); + + job_add_to_gc_queue(j); + return j; } @@ -639,6 +645,7 @@ _pure_ static const char *job_get_status_message_format(Unit *u, JobType t, JobR [JOB_DEPENDENCY] = "Dependency failed for %s.", [JOB_ASSERT] = "Assertion failed for %s.", [JOB_UNSUPPORTED] = "Starting of %s not supported.", + [JOB_COLLECTED] = "Unecessary job for %s was removed.", }; static const char *const generic_finished_stop_job[_JOB_RESULT_MAX] = { [JOB_DONE] = "Stopped %s.", @@ -698,6 +705,7 @@ static void job_print_status_message(Unit *u, JobType t, JobResult result) { [JOB_SKIPPED] = { ANSI_HIGHLIGHT, " INFO " }, [JOB_ASSERT] = { ANSI_HIGHLIGHT_YELLOW, "ASSERT" }, [JOB_UNSUPPORTED] = { ANSI_HIGHLIGHT_YELLOW, "UNSUPP" }, + [JOB_COLLECTED] = { ANSI_HIGHLIGHT, " INFO " }, }; const char *format; @@ -749,6 +757,7 @@ static void job_log_status_message(Unit *u, JobType t, JobResult result) { [JOB_INVALID] = LOG_INFO, [JOB_ASSERT] = LOG_WARNING, [JOB_UNSUPPORTED] = LOG_WARNING, + [JOB_COLLECTED] = LOG_INFO, }; assert(u); @@ -860,6 +869,7 @@ int job_finish_and_invalidate(Job *j, JobResult result, bool recursive, bool alr job_set_state(j, JOB_WAITING); job_add_to_run_queue(j); + job_add_to_gc_queue(j); goto finish; } @@ -903,11 +913,15 @@ int job_finish_and_invalidate(Job *j, JobResult result, bool recursive, bool alr finish: /* Try to start the next jobs that can be started */ SET_FOREACH(other, u->dependencies[UNIT_AFTER], i) - if (other->job) + if (other->job) { job_add_to_run_queue(other->job); + job_add_to_gc_queue(other->job); + } SET_FOREACH(other, u->dependencies[UNIT_BEFORE], i) - if (other->job) + if (other->job) { job_add_to_run_queue(other->job); + job_add_to_gc_queue(other->job); + } manager_check_finished(u->manager); @@ -1121,12 +1135,14 @@ int job_coldplug(Job *j) { /* After deserialization is complete and the bus connection * set up again, let's start watching our subscribers again */ - (void) bus_track_coldplug(j->manager, &j->bus_track, false, j->deserialized_clients); - j->deserialized_clients = strv_free(j->deserialized_clients); + (void) bus_job_coldplug_bus_track(j); if (j->state == JOB_WAITING) job_add_to_run_queue(j); + /* Maybe due to new dependencies we don't actually need this job anymore? */ + job_add_to_gc_queue(j); + if (j->begin_usec == 0 || j->unit->job_timeout == USEC_INFINITY) return 0; @@ -1201,6 +1217,95 @@ int job_get_timeout(Job *j, usec_t *timeout) { return 1; } +bool job_check_gc(Job *j) { + Unit *other; + Iterator i; + + assert(j); + + /* Checks whether this job should be GC'ed away. We only do this for jobs of units that have no effect on their + * own and just track external state. For now the only unit type that qualifies for this are .device units. */ + + if (!UNIT_VTABLE(j->unit)->gc_jobs) + return true; + + if (sd_bus_track_count(j->bus_track) > 0) + return true; + + /* FIXME: So this is a bit ugly: for now we don't properly track references made via private bus connections + * (because it's nasty, as sd_bus_track doesn't apply to it). We simply remember that the job was once + * referenced by one, and reset this whenever we notice that no private bus connections are around. This means + * the GC is a bit too conservative when it comes to jobs created by private bus connections. */ + if (j->ref_by_private_bus) { + if (set_isempty(j->unit->manager->private_buses)) + j->ref_by_private_bus = false; + else + return true; + } + + if (j->type == JOB_NOP) + return true; + + /* If a job is ordered after ours, and is to be started, then it needs to wait for us, regardless if we stop or + * start, hence let's not GC in that case. */ + SET_FOREACH(other, j->unit->dependencies[UNIT_BEFORE], i) { + if (!other->job) + continue; + + if (other->job->ignore_order) + continue; + + if (IN_SET(other->job->type, JOB_START, JOB_VERIFY_ACTIVE, JOB_RELOAD)) + return true; + } + + /* If we are going down, but something else is orederd After= us, then it needs to wait for us */ + if (IN_SET(j->type, JOB_STOP, JOB_RESTART)) { + + SET_FOREACH(other, j->unit->dependencies[UNIT_AFTER], i) { + if (!other->job) + continue; + + if (other->job->ignore_order) + continue; + + return true; + } + } + + /* The logic above is kinda the inverse of the job_is_runnable() logic. Specifically, if the job "we" is + * ordered before the job "other": + * + * we start + other start → stay + * we start + other stop → gc + * we stop + other start → stay + * we stop + other stop → gc + * + * "we" are ordered after "other": + * + * we start + other start → gc + * we start + other stop → gc + * we stop + other start → stay + * we stop + other stop → stay + * + */ + + return false; +} + +void job_add_to_gc_queue(Job *j) { + assert(j); + + if (j->in_gc_queue) + return; + + if (job_check_gc(j)) + return; + + LIST_PREPEND(gc_queue, j->unit->manager->gc_job_queue, j); + j->in_gc_queue = true; +} + static const char* const job_state_table[_JOB_STATE_MAX] = { [JOB_WAITING] = "waiting", [JOB_RUNNING] = "running", @@ -1244,6 +1349,7 @@ static const char* const job_result_table[_JOB_RESULT_MAX] = { [JOB_INVALID] = "invalid", [JOB_ASSERT] = "assert", [JOB_UNSUPPORTED] = "unsupported", + [JOB_COLLECTED] = "collected", }; DEFINE_STRING_TABLE_LOOKUP(job_result, JobResult); diff --git a/src/core/job.h b/src/core/job.h index ccfc7def4d..6fdec9f226 100644 --- a/src/core/job.h +++ b/src/core/job.h @@ -107,6 +107,7 @@ enum JobResult { JOB_INVALID, /* JOB_RELOAD of inactive unit */ JOB_ASSERT, /* Couldn't start a unit, because an assert didn't hold */ JOB_UNSUPPORTED, /* Couldn't start a unit, because the unit type is not supported on the system */ + JOB_COLLECTED, /* Job was garbage collected, since nothing needed it anymore */ _JOB_RESULT_MAX, _JOB_RESULT_INVALID = -1 }; @@ -133,6 +134,7 @@ struct Job { LIST_FIELDS(Job, transaction); LIST_FIELDS(Job, run_queue); LIST_FIELDS(Job, dbus_queue); + LIST_FIELDS(Job, gc_queue); LIST_HEAD(JobDependency, subject_list); LIST_HEAD(JobDependency, object_list); @@ -168,6 +170,8 @@ struct Job { bool sent_dbus_new_signal:1; bool ignore_order:1; bool irreversible:1; + bool in_gc_queue:1; + bool ref_by_private_bus:1; }; Job* job_new(Unit *unit, JobType type); @@ -227,6 +231,9 @@ void job_shutdown_magic(Job *j); int job_get_timeout(Job *j, usec_t *timeout) _pure_; +bool job_check_gc(Job *j); +void job_add_to_gc_queue(Job *j); + const char* job_type_to_string(JobType t) _const_; JobType job_type_from_string(const char *s) _pure_; diff --git a/src/core/manager.c b/src/core/manager.c index dc81af9492..31770eef3a 100644 --- a/src/core/manager.c +++ b/src/core/manager.c @@ -981,10 +981,9 @@ good: unit_gc_mark_good(u, gc_marker); } -static unsigned manager_dispatch_gc_queue(Manager *m) { +static unsigned manager_dispatch_gc_unit_queue(Manager *m) { + unsigned n = 0, gc_marker; Unit *u; - unsigned n = 0; - unsigned gc_marker; assert(m); @@ -996,12 +995,12 @@ static unsigned manager_dispatch_gc_queue(Manager *m) { gc_marker = m->gc_marker; - while ((u = m->gc_queue)) { + while ((u = m->gc_unit_queue)) { assert(u->in_gc_queue); unit_gc_sweep(u, gc_marker); - LIST_REMOVE(gc_queue, m->gc_queue, u); + LIST_REMOVE(gc_queue, m->gc_unit_queue, u); u->in_gc_queue = false; n++; @@ -1018,6 +1017,30 @@ static unsigned manager_dispatch_gc_queue(Manager *m) { return n; } +static unsigned manager_dispatch_gc_job_queue(Manager *m) { + unsigned n = 0; + Job *j; + + assert(m); + + while ((j = m->gc_job_queue)) { + assert(j->in_gc_queue); + + LIST_REMOVE(gc_queue, m->gc_job_queue, j); + j->in_gc_queue = false; + + n++; + + if (job_check_gc(j)) + continue; + + log_unit_debug(j->unit, "Collecting job."); + (void) job_finish_and_invalidate(j, JOB_COLLECTED, false, false); + } + + return n; +} + static void manager_clear_jobs_and_units(Manager *m) { Unit *u; @@ -1033,7 +1056,8 @@ static void manager_clear_jobs_and_units(Manager *m) { assert(!m->dbus_unit_queue); assert(!m->dbus_job_queue); assert(!m->cleanup_queue); - assert(!m->gc_queue); + assert(!m->gc_unit_queue); + assert(!m->gc_job_queue); assert(hashmap_isempty(m->jobs)); assert(hashmap_isempty(m->units)); @@ -2226,7 +2250,10 @@ int manager_loop(Manager *m) { if (manager_dispatch_load_queue(m) > 0) continue; - if (manager_dispatch_gc_queue(m) > 0) + if (manager_dispatch_gc_job_queue(m) > 0) + continue; + + if (manager_dispatch_gc_unit_queue(m) > 0) continue; if (manager_dispatch_cleanup_queue(m) > 0) diff --git a/src/core/manager.h b/src/core/manager.h index aa3f95e8e0..d54ca54107 100644 --- a/src/core/manager.h +++ b/src/core/manager.h @@ -104,8 +104,9 @@ struct Manager { /* Units to remove */ LIST_HEAD(Unit, cleanup_queue); - /* Units to check when doing GC */ - LIST_HEAD(Unit, gc_queue); + /* Units and jobs to check when doing GC */ + LIST_HEAD(Unit, gc_unit_queue); + LIST_HEAD(Job, gc_job_queue); /* Units that should be realized */ LIST_HEAD(Unit, cgroup_queue); diff --git a/src/core/unit.c b/src/core/unit.c index df60a5bf04..fbb21e4985 100644 --- a/src/core/unit.c +++ b/src/core/unit.c @@ -389,7 +389,7 @@ void unit_add_to_gc_queue(Unit *u) { if (unit_check_gc(u)) return; - LIST_PREPEND(gc_queue, u->manager->gc_queue, u); + LIST_PREPEND(gc_queue, u->manager->gc_unit_queue, u); u->in_gc_queue = true; } @@ -569,7 +569,7 @@ void unit_free(Unit *u) { LIST_REMOVE(cleanup_queue, u->manager->cleanup_queue, u); if (u->in_gc_queue) - LIST_REMOVE(gc_queue, u->manager->gc_queue, u); + LIST_REMOVE(gc_queue, u->manager->gc_unit_queue, u); if (u->in_cgroup_queue) LIST_REMOVE(cgroup_queue, u->manager->cgroup_queue, u); diff --git a/src/core/unit.h b/src/core/unit.h index 991543664b..6d6885b487 100644 --- a/src/core/unit.h +++ b/src/core/unit.h @@ -441,6 +441,9 @@ struct UnitVTable { /* True if transient units of this type are OK */ bool can_transient:1; + + /* True if queued jobs of this type should be GC'ed if no other job needs them anymore */ + bool gc_jobs:1; }; extern const UnitVTable * const unit_vtable[_UNIT_TYPE_MAX]; diff --git a/src/shared/bus-unit-util.c b/src/shared/bus-unit-util.c index 4f66497f3a..a7702602eb 100644 --- a/src/shared/bus-unit-util.c +++ b/src/shared/bus-unit-util.c @@ -838,6 +838,8 @@ static int check_wait_response(BusWaitForJobs *d, bool quiet, const char* const* log_error("Assertion failed on job for %s.", strna(d->name)); else if (streq(d->result, "unsupported")) log_error("Operation on or unit type of %s not supported on this system.", strna(d->name)); + else if (streq(d->result, "collected")) + log_error("Queued job for %s was garbage collected.", strna(d->name)); else if (!streq(d->result, "done") && !streq(d->result, "skipped")) { if (d->name) { int q; @@ -853,7 +855,7 @@ static int check_wait_response(BusWaitForJobs *d, bool quiet, const char* const* } } - if (streq(d->result, "canceled")) + if (STR_IN_SET(d->result, "canceled", "collected")) r = -ECANCELED; else if (streq(d->result, "timeout")) r = -ETIME; |