Trigger notification on ttl expire

This commit is contained in:
kpcyrd 2020-06-13 00:48:11 +02:00
parent b838dc5b31
commit eb2e6203be
9 changed files with 87 additions and 32 deletions

View File

@ -0,0 +1,18 @@
PRAGMA foreign_keys=off;
CREATE TABLE _ttls_new (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
family VARCHAR NOT NULL,
key INTEGER NOT NULL,
expire DATETIME NOT NULL,
CONSTRAINT ttl_unique UNIQUE (family, key)
);
INSERT INTO _ttls_new (id, family, key, expire)
SELECT id, family, key, expire
FROM ttls;
DROP TABLE ttls;
ALTER TABLE _ttls_new RENAME TO ttls;
PRAGMA foreign_keys=on;

View File

@ -0,0 +1,19 @@
PRAGMA foreign_keys=off;
CREATE TABLE _ttls_new (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
family VARCHAR NOT NULL,
key INTEGER NOT NULL,
value VARCHAR NOT NULL,
expire DATETIME NOT NULL,
CONSTRAINT ttl_unique UNIQUE (family, key)
);
INSERT INTO _ttls_new (id, family, key, value, expire)
SELECT id, family, key, "unknown", expire
FROM ttls;
DROP TABLE ttls;
ALTER TABLE _ttls_new RENAME TO ttls;
PRAGMA foreign_keys=on;

View File

@ -3,6 +3,7 @@ use crate::errors::*;
use crate::blobs::Blob;
use crate::cmd::Cmd;
use crate::db::ttl;
use crate::models::*;
use crate::shell::Shell;
use serde_json;
use serde::Serialize;
@ -10,7 +11,6 @@ use std::io::{self, Write};
use structopt::StructOpt;
use structopt::clap::AppSettings;
use strum_macros::{EnumString, IntoStaticStr};
use crate::models::*;
#[derive(Debug, StructOpt)]
#[structopt(global_settings = &[AppSettings::ColoredHelp])]
@ -22,7 +22,7 @@ pub struct Args {
impl Cmd for Args {
fn run(self, rl: &mut Shell) -> Result<()> {
ttl::reap_expired(rl.db())?;
ttl::reap_expired(rl)?;
match self.format {
Format::Json => export::<JsonFormat>(rl),
Format::JsonBlobs => export::<JsonBlobsFormat>(rl),

View File

@ -207,7 +207,7 @@ pub fn execute(rl: &mut Shell, params: Params, options: HashMap<String, String>)
impl Cmd for Args {
fn run(self, rl: &mut Shell) -> Result<()> {
ttl::reap_expired(rl.db())?;
ttl::reap_expired(rl)?;
let options = match rl.options_mut() {
Some(options) => options.clone(),
_ => HashMap::new(),

View File

@ -94,7 +94,7 @@ impl<'a, 'b> Printer<'a, 'b> {
impl Cmd for Args {
fn run(self, rl: &mut Shell) -> Result<()> {
ttl::reap_expired(rl.db())?;
ttl::reap_expired(rl)?;
let printer = Printer::new(rl, &self);
match &self.subcommand {

View File

@ -1,7 +1,10 @@
use crate::errors::*;
use crate::db::{Database, Table};
use crate::schema::*;
use crate::errors::*;
use crate::models::*;
use crate::notify::{self, Notification};
use crate::schema::*;
use crate::shell::Shell;
use crate::term::{self, Term};
use chrono::{NaiveDateTime, Duration, Utc};
use diesel;
use diesel::prelude::*;
@ -13,6 +16,7 @@ pub struct Ttl {
pub id: i32,
pub family: String,
pub key: i32,
pub value: String,
pub expire: NaiveDateTime,
}
@ -21,14 +25,16 @@ pub struct Ttl {
pub struct NewTtl<'a> {
pub family: &'a str,
pub key: i32,
pub value: String,
pub expire: NaiveDateTime,
}
impl Ttl {
pub fn new(obj: &Insert, key: i32, expire: NaiveDateTime) -> NewTtl {
pub fn new(obj: &Insert, key: i32, value: String, expire: NaiveDateTime) -> NewTtl {
NewTtl {
family: obj.table(),
key,
value,
expire,
}
}
@ -59,7 +65,7 @@ impl Ttl {
expire_at.naive_utc()
}
pub fn create(obj: &Insert, key: i32, ttl: i32, db: &Database) -> Result<()> {
pub fn create(obj: &Insert, key: i32, value: String, ttl: i32, db: &Database) -> Result<()> {
debug!("Creating ttl on record");
let expire = Self::ttl_to_datetime(ttl);
@ -67,6 +73,7 @@ impl Ttl {
.values(NewTtl {
family: obj.table(),
key,
value,
expire,
})
.execute(db.db())?;
@ -127,12 +134,21 @@ impl Ttl {
}
}
pub fn reap_expired(db: &Database) -> Result<()> {
pub fn reap_expired(rl: &mut Shell) -> Result<()> {
debug!("Reaping expired entities");
for expired in Ttl::expired(db)? {
for expired in Ttl::expired(rl.db())? {
debug!("Expired: {:?}", expired);
expired.delete(db)?;
expired.delete(rl.db())?;
let subject = format!("Deleted {} {:?}", &expired.family, &expired.value);
let topic = &format!("db:{}:{}:delete", &expired.family, &expired.value);
if let Err(err) = notify::trigger_notify_event(rl, &mut Term, &topic, &Notification {
subject,
body: None,
}) {
term::error(&format!("Failed to send notifications: {}", err));
}
}
debug!("Finished reaping expired entities");

View File

@ -228,6 +228,7 @@ table! {
id -> Integer,
family -> Text,
key -> Integer,
value -> Text,
expire -> Timestamp,
}
}

View File

@ -509,7 +509,6 @@ pub fn init<'a>(args: &Args, config: &'a Config, verbose_init: bool) -> Result<S
} else {
Database::establish_quiet(workspace)?
};
ttl::reap_expired(&db)?;
let cache_dir = paths::cache_dir()?;
let psl = PslReader::open_or_download(&cache_dir,
@ -529,7 +528,9 @@ pub fn init<'a>(args: &Args, config: &'a Config, verbose_init: bool) -> Result<S
}
autoupdate.check_background(&config, library.list());
let rl = Shell::new(&config, db, blobs, psl, library, keyring);
let mut rl = Shell::new(&config, db, blobs, psl, library, keyring);
ttl::reap_expired(&mut rl)?;
Ok(rl)
}

View File

@ -178,21 +178,14 @@ impl DatabaseEvent {
}
}
fn on_insert<T: SpinLogger>(rl: &mut Shell, spinner: &mut T, object: &Insert) {
match object.value(rl.db()) {
Ok(value) => {
// TODO: also include fields, see update
let log = format!("Adding {} {:?}", object.family(), value); // TODO: also include fields here
spinner.log(&log);
fn on_insert<T: SpinLogger>(rl: &mut Shell, spinner: &mut T, family: &str, value: &str) {
// TODO: also include fields, see update
let log = format!("Adding {} {:?}", family, value);
spinner.log(&log);
let subject = format!("Added {} {:?}", object.family(), value);
let topic = format!("db:{}:{}:insert", object.family(), value);
Self::notify(rl, spinner, &topic, subject);
}
Err(err) => {
spinner.error(&format!("Failed to query necessary fields for {:?}: {:?}", object, err));
}
}
let subject = format!("Added {} {:?}", family, value);
let topic = format!("db:{}:{}:insert", family, value);
Self::notify(rl, spinner, &topic, subject);
}
fn on_update<T: SpinLogger>(rl: &mut Shell, spinner: &mut T, family: &str, value: &str, update: &Update) {
@ -250,14 +243,21 @@ impl DatabaseEvent {
let result = match result {
Ok(Some((DbChange::Insert, id))) => {
if let Some(ttl) = ttl {
if let Err(err) = Ttl::create(&object, id, ttl, db) {
spinner.error(&format!("Failed to set ttl: {:?}", err));
match object.value(rl.db()) {
Ok(value) => {
if let Some(ttl) = ttl {
if let Err(err) = Ttl::create(&object, id, value.to_string(), ttl, db) {
spinner.error(&format!("Failed to set ttl: {:?}", err));
}
}
Self::on_insert(rl, spinner, object.family(), &value);
}
Err(err) => {
spinner.error(&format!("Failed to query necessary fields for {:?}: {:?}", object, err));
}
}
Self::on_insert(rl, spinner, &object);
Ok(DatabaseResponse::Inserted(id))
},
Ok(Some((DbChange::Update(update), id))) => {