Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 100 additions & 1 deletion src/payment/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@ impl StorableObject for PaymentDetails {
if let Some(tx_type_update) = update.tx_type {
match self.kind {
PaymentKind::Onchain { ref mut tx_type, .. } => {
update_if_necessary!(*tx_type, tx_type_update);
if tx_type.is_none() || tx_type_update.is_some() {
update_if_necessary!(*tx_type, tx_type_update);
}
},
_ => {},
}
Expand Down Expand Up @@ -921,6 +923,103 @@ mod tests {
assert_eq!(kind, PaymentKind::read(&mut &*kind.encode()).unwrap());
}

#[test]
fn known_onchain_tx_type_survives_unknown_update() {
use bitcoin::hashes::Hash;
use std::str::FromStr;

let txid = Txid::from_byte_array([8u8; 32]);
let payment_id = PaymentId(txid.to_byte_array());
let pubkey = PublicKey::from_str(
"0279be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798",
)
.unwrap();
let tx_type = TransactionType::CooperativeClose {
counterparty_node_id: pubkey,
channel_id: ChannelId([4u8; 32]),
};
let mut classified = PaymentDetails::new(
payment_id,
PaymentKind::Onchain {
txid,
status: ConfirmationStatus::Unconfirmed,
tx_type: Some(tx_type.clone()),
},
Some(1_000),
Some(100),
PaymentDirection::Inbound,
PaymentStatus::Pending,
);
let wallet_sync_update = PaymentDetails::new(
payment_id,
PaymentKind::Onchain {
txid,
status: ConfirmationStatus::Confirmed {
block_hash: BlockHash::from_byte_array([9u8; 32]),
height: 42,
timestamp: 123,
},
tx_type: None,
},
Some(1_000),
Some(100),
PaymentDirection::Inbound,
PaymentStatus::Pending,
);

assert!(classified.update(PaymentDetailsUpdate::from(&wallet_sync_update)));
match classified.kind {
PaymentKind::Onchain { status, tx_type: Some(updated_tx_type), .. } => {
assert!(matches!(status, ConfirmationStatus::Confirmed { height: 42, .. }));
assert_eq!(updated_tx_type, tx_type);
},
other => panic!("Unexpected payment kind: {:?}", other),
}
}

#[test]
fn transaction_type_from_ldk_variants() {
use std::str::FromStr;

let pubkey = PublicKey::from_str(
"0279be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798",
)
.unwrap();
let channel_id = ChannelId([5u8; 32]);
let channel = Channel { counterparty_node_id: pubkey, channel_id };

let variants = vec![
(
LdkTransactionType::Funding { channels: vec![(pubkey, channel_id)] },
TransactionType::Funding { channels: vec![channel.clone()] },
),
(
LdkTransactionType::CooperativeClose { counterparty_node_id: pubkey, channel_id },
TransactionType::CooperativeClose { counterparty_node_id: pubkey, channel_id },
),
(
LdkTransactionType::UnilateralClose { counterparty_node_id: pubkey, channel_id },
TransactionType::UnilateralClose { counterparty_node_id: pubkey, channel_id },
),
(
LdkTransactionType::AnchorBump { counterparty_node_id: pubkey, channel_id },
TransactionType::AnchorBump { counterparty_node_id: pubkey, channel_id },
),
(
LdkTransactionType::Claim { counterparty_node_id: pubkey, channel_id },
TransactionType::Claim { counterparty_node_id: pubkey, channel_id },
),
(
LdkTransactionType::Sweep { channels: vec![(pubkey, channel_id)] },
TransactionType::Sweep { channels: vec![channel] },
),
];

for (ldk_type, expected_type) in variants {
assert_eq!(TransactionType::from(ldk_type), expected_type);
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
struct LegacyBolt11JitKind {
hash: PaymentHash,
Expand Down
29 changes: 22 additions & 7 deletions src/tx_broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use std::ops::Deref;
use std::sync::{Mutex as StdMutex, Weak};

use bitcoin::Transaction;
use lightning::chain::chaininterface::{BroadcasterInterface, TransactionType};
use lightning::chain::chaininterface::{
BroadcasterInterface, TransactionType as LdkTransactionType,
};
use tokio::sync::{mpsc, Mutex, MutexGuard};

use crate::logger::{log_error, LdkLogger};
Expand All @@ -22,16 +24,21 @@ const BCAST_PACKAGE_QUEUE_SIZE: usize = 50;
/// call, along with each transaction's type. Queued until the background task classifies and
/// broadcasts it. Built only via [`BroadcastPackage::new`] from such a call, so unrelated
/// transactions can't be grouped into one package by accident.
pub(crate) struct BroadcastPackage(Vec<(Transaction, TransactionType)>);
pub(crate) struct BroadcastPackage(Vec<(Transaction, Option<LdkTransactionType>)>);

impl BroadcastPackage {
/// Builds a package from the transactions of a single `broadcast_transactions` call.
fn new(txs: &[(&Transaction, TransactionType)]) -> Self {
Self(txs.iter().map(|(tx, tx_type)| ((*tx).clone(), tx_type.clone())).collect())
fn new(txs: &[(&Transaction, LdkTransactionType)]) -> Self {
Self(txs.iter().map(|(tx, tx_type)| ((*tx).clone(), Some(tx_type.clone()))).collect())
}

/// Builds a package for wallet-originated broadcasts that have no LDK classification.
fn unclassified(txs: Vec<Transaction>) -> Self {
Self(txs.into_iter().map(|tx| (tx, None)).collect())
}

/// The packaged transactions and their types, for classification.
fn transactions(&self) -> &[(Transaction, TransactionType)] {
fn transactions(&self) -> &[(Transaction, Option<LdkTransactionType>)] {
&self.0
}

Expand Down Expand Up @@ -91,18 +98,26 @@ where
let wallet_opt = self.wallet.lock().expect("lock").as_ref().and_then(Weak::upgrade);
if let Some(wallet) = wallet_opt {
for (tx, tx_type) in package.transactions() {
wallet.classify_broadcast(tx, tx_type).await?;
if let Some(tx_type) = tx_type {
wallet.classify_broadcast(tx, tx_type).await?;
}
}
}
Ok(package)
}

pub(crate) fn broadcast_unclassified_transactions(&self, txs: Vec<Transaction>) {
self.queue_sender.try_send(BroadcastPackage::unclassified(txs)).unwrap_or_else(|e| {
log_error!(self.logger, "Failed to broadcast transactions: {}", e);
});
}
}

impl<L: Deref> BroadcasterInterface for TransactionBroadcaster<L>
where
L::Target: LdkLogger,
{
fn broadcast_transactions(&self, txs: &[(&Transaction, TransactionType)]) {
fn broadcast_transactions(&self, txs: &[(&Transaction, LdkTransactionType)]) {
self.queue_sender.try_send(BroadcastPackage::new(txs)).unwrap_or_else(|e| {
log_error!(self.logger, "Failed to broadcast transactions: {}", e);
});
Expand Down
77 changes: 50 additions & 27 deletions src/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use bitcoin::{
WPubkeyHash, Weight, WitnessProgram, WitnessVersion,
};
use lightning::chain::chaininterface::{
BroadcasterInterface, FundingCandidate, TransactionType as LdkTransactionType,
FundingCandidate, TransactionType as LdkTransactionType,
INCREMENTAL_RELAY_FEE_SAT_PER_1000_WEIGHT,
};
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
Expand Down Expand Up @@ -335,21 +335,12 @@ impl Wallet {
.collect();

if !txs_to_broadcast.is_empty() {
let tx_refs: Vec<(
&Transaction,
lightning::chain::chaininterface::TransactionType,
)> =
txs_to_broadcast
.iter()
.map(|tx| {
(tx, lightning::chain::chaininterface::TransactionType::Sweep { channels: vec![] })
})
.collect();
self.broadcaster.broadcast_transactions(&tx_refs);
let tx_count = txs_to_broadcast.len();
self.broadcaster.broadcast_unclassified_transactions(txs_to_broadcast);
log_info!(
self.logger,
"Rebroadcast {} unconfirmed transactions on chain tip change",
txs_to_broadcast.len()
tx_count
);
}
}
Expand Down Expand Up @@ -889,12 +880,8 @@ impl Wallet {
})?
};

self.broadcaster.broadcast_transactions(&[(
&tx,
lightning::chain::chaininterface::TransactionType::Sweep { channels: vec![] },
)]);

let txid = tx.compute_txid();
self.broadcaster.broadcast_unclassified_transactions(vec![tx]);

match send_amount {
OnchainSendAmount::ExactRetainingReserve { amount_sats, .. } => {
Expand Down Expand Up @@ -1184,9 +1171,8 @@ impl Wallet {
Ok(tx)
}

/// Classifies a funding broadcast (channel open or splice) handed to the broadcaster by LDK,
/// recording a payment for it before it is sent. Other transaction types are left for wallet
/// sync to record normally.
/// Classifies an on-chain broadcast handed to the broadcaster by LDK, recording a payment for it
/// before it is sent when it affects this node's wallet.
pub(crate) async fn classify_broadcast(
&self, tx: &Transaction, tx_type: &LdkTransactionType,
) -> Result<(), Error> {
Expand All @@ -1197,7 +1183,13 @@ impl Wallet {
LdkTransactionType::InteractiveFunding { candidates } => {
self.classify_interactive_funding(tx, candidates, tx_type.clone().into()).await
},
_ => Ok(()),
LdkTransactionType::CooperativeClose { .. }
| LdkTransactionType::UnilateralClose { .. }
| LdkTransactionType::AnchorBump { .. }
| LdkTransactionType::Claim { .. }
| LdkTransactionType::Sweep { .. } => {
self.classify_regular_broadcast(tx, tx_type.clone().into()).await
},
}
}

Expand Down Expand Up @@ -1338,6 +1330,40 @@ impl Wallet {
Ok(())
}

/// Records a non-funding LDK broadcast as an on-chain payment, tagged with its transaction type.
/// Wallet sync later refreshes confirmation status while preserving the type.
async fn classify_regular_broadcast(
&self, tx: &Transaction, tx_type: TransactionType,
) -> Result<(), Error> {
let txid = tx.compute_txid();
let (amount_msat, fee_paid_msat, direction) = self.onchain_payment_fields(tx);

if amount_msat == Some(0) && fee_paid_msat == Some(0) {
log_trace!(
self.logger,
"Not recording classified broadcast {} as a payment: no wallet-level activity",
txid,
);
return Ok(());
}

let details = PaymentDetails::new(
PaymentId(txid.to_byte_array()),
PaymentKind::Onchain {
txid,
status: ConfirmationStatus::Unconfirmed,
tx_type: Some(tx_type),
},
amount_msat,
fee_paid_msat,
direction,
PaymentStatus::Pending,
);
self.payment_store.insert_or_update(details).await?;
log_debug!(self.logger, "Recorded classified on-chain broadcast {}", txid);
Ok(())
}

/// Writes a freshly-classified funding payment to the authoritative payment store and adds a
/// pending-store index entry, so wallet sync graduates it through `ANTI_REORG_DELAY`.
async fn persist_funding_payment(
Expand Down Expand Up @@ -1719,11 +1745,6 @@ impl Wallet {

let new_txid = fee_bumped_tx.compute_txid();

self.broadcaster.broadcast_transactions(&[(
&fee_bumped_tx,
lightning::chain::chaininterface::TransactionType::Sweep { channels: vec![] },
)]);

let new_payment = self.create_payment_from_tx(
&locked_wallet,
new_txid,
Expand All @@ -1733,6 +1754,8 @@ impl Wallet {
ConfirmationStatus::Unconfirmed,
);

self.broadcaster.broadcast_unclassified_transactions(vec![fee_bumped_tx]);

let pending_payment_store =
self.create_pending_payment_from_tx(new_payment.clone(), Vec::new());

Expand Down
Loading
Loading