
Your agent's memory outlives the session. So does the attack.
Memory poisoning is a distinct threat class from prompt injection: instead of hijacking a live session, a single adversarial write corrupts an agent's persistent memory store and silently steers behavior across every future session. Pritam Dash et al. (arXiv:2606.04329, June 3, 2026) provide the first systematic study — 4 write channels, 9 structural vulnerabilities, 6 attack classes, and MPBench results showing 50.46% average ASR and 41.05% RSR across OpenClaw and HERMES. Existing prompt injection classifiers fail structurally on weak-signal attacks (PromptArmor drops from 84.4% to 42.5% TPR), because the defense must operate at the write path, not the input boundary. The article provides a 7-section production-ready Python MemoryGuard middleware synthesizing VMG, OWASP Agent Memory Guard, HMAC signed writes, contradiction detection, and bitemporal rollback, along with three prioritized configuration steps to ship.
리서치 브리프
The benchmark gap that hides the problem
"persistent memory introduces the risk of memory poisoning, where a single adversarial memory write can exert long-term influence over agent behavior" 1
Why your current defenses don't cover this
| Defense | Strong-signal TPR | Weak-signal TPR | Gap |
|---|---|---|---|
| PIGuard (off-the-shelf) | 48.33% | 28.33% | 20 pp |
| CommandSans (off-the-shelf) | 68.33% | 28.34% | 40 pp |
| PromptArmor (off-the-shelf) | 84.44% | 42.50% | 41.94 pp |
| PIGuard (re-trained) | 47.67% | 46.00% | 1.67 pp |
"adaptation provides no benefit even for a strong LLM-based guardrail, suggesting the weakness is structural rather than model or training distribution"
"Defending against memory poisoning requires defenses that operate at the write path, not the input boundary."
The 4 write channels and how they're exploited

"The self-improvement loop treats all steps that executed without error as validated, and builds subsequent revisions around the existing procedure, including any adversarially introduced steps"
memory_search deliberately) reduces RSR to 17.40% at the cost of some long-term task capability. 1"agents designed to write and retrieve memory more freely in order to perform better on long-horizon tasks are proportionally easier to poison"
Cross-validation: SafeClawBench confirms the severity

The defense: MemoryGuard write-path middleware
"The honest limit: because instruction and data share one channel, no prompt-level rule fully closes ASI06. You are containing blast radius, not eliminating the class." 3
"""
MemoryGuard — Production-Ready Memory Poisoning Defense Middleware
===================================================================
Synthesized from:
- GenAlpha signed-write + bitemporal rollback pattern
- BeyondScale layered defense architecture
- CAMS 5-layer conceptual framework (Dhivyasree et al., Elsevier 2026)
- VMG 5 primitives: WA → PV → RB → VF (arXiv:2604.16548)
- OWASP Agent Memory Guard (released 2026-06-01)
Targets OWASP ASI06: Agentic Memory and Context Poisoning.
Defends against all 6 attack classes in arXiv:2606.04329 (Dash et al.).
Usage:
guard = MemoryGuard(
policy=Policy.from_yaml("memory_policy.yaml"),
signing_key=os.urandom(32),
)
# On every memory write — this is the single chokepoint:
entry = MemoryEntry(
key="user_12.default_role",
value="viewer",
source_class=SourceClass.USER,
principal="user-001",
timestamp=datetime.utcnow(),
valid_from=datetime.utcnow(),
)
success, reason = guard.validate_write(entry)
if success:
guard.store.commit(entry)
# On every retrieval:
memories = guard.retrieve("user preferences", principal="agent-001")
# Incident response — 4-step GenAlpha playbook:
report = guard.incident_response(compromise_time=datetime(2026, 6, 20, 14, 0))
"""
import hashlib
import hmac
import json
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Dict, List, Optional, Set, Tuple
# ═══════════════════════════════════════════════════════════════════
# SECTION 1: Source classification (VMG-WA + VMG-PV)
# Every memory entry must carry an immutable SourceClass label.
# The write guard uses this to enforce authorization rules.
# ═══════════════════════════════════════════════════════════════════
class SourceClass(Enum):
"""Immutable source origin per GenAlpha signed-write pattern.
SYSTEM: Built-in constraints; immutable keys (e.g. customer.id).
USER: Explicit user-authored preferences or directives.
AGENT_AUTHORED: Agent-generated summaries, lessons, self-reflection.
EXTERNAL_TOOL: Tool outputs, MCP responses, RAG-retrieved documents.
"""
SYSTEM = "SYSTEM"
USER = "USER"
AGENT_AUTHORED = "AGENT_AUTHORED"
EXTERNAL_TOOL = "EXTERNAL_TOOL"
class ExecutionScope(Enum):
"""Execution authority granted at retrieval time.
CONTEXT_ONLY: Descriptive only — stripped from planning/tool blocks.
PLANNING_ALLOWED: Can influence a plan but cannot invoke tools directly.
TOOL_ACTION_ALLOWED: Full execution authority (SYSTEM / AGENT_AUTHORED only).
"""
CONTEXT_ONLY = "CONTEXT_ONLY"
PLANNING_ALLOWED = "PLANNING_ALLOWED"
TOOL_ACTION_ALLOWED = "TOOL_ACTION_ALLOWED"
@dataclass
class MemoryEntry:
"""A memory entry with full VMG provenance metadata.
Each field maps to a VMG primitive:
source_class + signature → WA (Write Authorization)
principal + parent_key → PV (Provenance Visibility)
valid_from / valid_to → RB (Rollbackability) bitemporal fields
"""
key: str
value: Any
source_class: SourceClass
principal: str
timestamp: datetime
valid_from: datetime
valid_to: Optional[datetime] = None # set when entry is superseded
signature: Optional[str] = None # HMAC over key|value|ts|source
version: int = 1
parent_key: Optional[str] = None # provenance chain linkage
# ═══════════════════════════════════════════════════════════════════
# SECTION 2: Declarative security policy
# Mirrors GenAlpha's YAML policy pattern + OWASP AMG integration.
# ═══════════════════════════════════════════════════════════════════
@dataclass
class Policy:
protected_keys: Set[str] = field(default_factory=lambda: {
"system.*", "identity.role", "auth.scopes"
})
immutable_keys: Set[str] = field(default_factory=lambda: {
"customer.id", "organization.tenant_id"
})
scope_rules: Dict[SourceClass, ExecutionScope] = field(default_factory=lambda: {
SourceClass.SYSTEM: ExecutionScope.TOOL_ACTION_ALLOWED,
SourceClass.USER: ExecutionScope.PLANNING_ALLOWED,
SourceClass.AGENT_AUTHORED: ExecutionScope.PLANNING_ALLOWED,
SourceClass.EXTERNAL_TOOL: ExecutionScope.CONTEXT_ONLY, # ← never executes
})
max_external_in_top_k: int = 2 # EXTERNAL_TOOL entries capped in retrieval
cooldown_seconds: int = 300 # anti-drift: minimum gap between self-similar writes
canary_endpoints: Set[str] = field(default_factory=lambda: {
"canary-internal-dns.local" # decoy tripwire endpoint
})
max_writes_per_minute: int = 50
@classmethod
def from_yaml(cls, path: str) -> "Policy":
"""Load policy from YAML. Replace stub with yaml.safe_load(open(path))."""
return cls()
# ═══════════════════════════════════════════════════════════════════
# SECTION 3: Write-path guard (CAMS Layer 4 + OWASP AMG)
# This is the single chokepoint. No write reaches storage
# without passing all 8 checks below in order.
# ═══════════════════════════════════════════════════════════════════
class WriteGuard:
def __init__(self, policy: Policy, signing_key: bytes):
self.policy = policy
self.signing_key = signing_key
self._write_ts: List[float] = []
self._recent_hash: Dict[str, float] = {}
def validate(self, entry: MemoryEntry) -> Tuple[bool, str]:
"""Run all write-path checks. Returns (is_safe, reason)."""
# Check 1: Protected key tamper (C1 defense)
if self._is_protected(entry.key) and entry.source_class != SourceClass.SYSTEM:
return False, f"PROTECTED_KEY_TAMPER: {entry.key}"
# Check 2: Immutable key modification
if self._is_immutable(entry.key):
return False, f"IMMUTABLE_KEY: {entry.key}"
# Check 3: SourceClass authorization
# EXTERNAL_TOOL writes must never bypass this gate directly.
if entry.source_class == SourceClass.EXTERNAL_TOOL:
return False, "EXTERNAL_TOOL writes require staging validation"
if entry.source_class == SourceClass.SYSTEM and entry.principal != "system":
return False, "Non-system principal cannot write SYSTEM entries"
# Check 4: Injection heuristic (plug OWASP AMG here in production)
if self._detect_injection(entry.value):
return False, f"INJECTION_PATTERN: {entry.key}"
# Check 5: Size anomaly (>100KB signals crafted payload)
if len(json.dumps(str(entry.value))) > 100_000:
return False, f"SIZE_ANOMALY: {entry.key}"
# Check 6: Write-frequency burst detection
now = time.time()
self._write_ts = [t for t in self._write_ts if now - t < 60]
if len(self._write_ts) >= self.policy.max_writes_per_minute:
return False, "FREQUENCY_ANOMALY: burst write"
self._write_ts.append(now)
# Check 7: Self-similarity cooldown (C3 / salience-driven attack defense)
content_hash = hashlib.sha256(
json.dumps(str(entry.value), sort_keys=True).encode()
).hexdigest()
if content_hash in self._recent_hash:
elapsed = now - self._recent_hash[content_hash]
if elapsed < self.policy.cooldown_seconds:
return False, (
f"SELF_SIMILARITY_COOLDOWN: {elapsed:.0f}s "
f"< {self.policy.cooldown_seconds}s"
)
self._recent_hash[content_hash] = now
# Check 8: Canary tripwire (GenAlpha contradiction detection pattern)
raw = json.dumps(str(entry.value)).lower()
if any(c in raw for c in self.policy.canary_endpoints):
return False, "CANARY_TRIPWIRE: entry references decoy endpoint"
return True, "OK"
def sign(self, entry: MemoryEntry) -> str:
"""HMAC-SHA256 over key | value | timestamp | source_class.
Per GenAlpha: 'sign each write with a per-agent key.'
"""
payload = (
f"{entry.key}|"
f"{json.dumps(str(entry.value), sort_keys=True)}|"
f"{entry.timestamp.isoformat()}|"
f"{entry.source_class.value}"
)
return hmac.new(self.signing_key, payload.encode(), hashlib.sha256).hexdigest()
def _is_protected(self, key: str) -> bool:
import re
return any(
re.match("^" + re.escape(p).replace(r"\*", ".*") + "$", key)
for p in self.policy.protected_keys
)
def _is_immutable(self, key: str) -> bool:
return key in self.policy.immutable_keys
def _detect_injection(self, value: Any) -> bool:
"""Stub for OWASP AMG injection scanner.
In production, replace with:
from owasp_amg import MemoryGuard as OWASP_AMG
return not OWASP_AMG().scan(json.dumps(value)).is_safe
"""
raw = json.dumps(str(value)).lower()
markers = [
"ignore previous instructions",
"you are now",
"system prompt override",
"for future sessions, remember",
"always record that",
"from now on, treat",
]
return any(m in raw for m in markers)
# ═══════════════════════════════════════════════════════════════════
# SECTION 4: Contradiction & anomaly detection (CAMS Layer 5 pattern)
# Map each incoming write to a (subject, predicate, object) triple
# and check it against the existing knowledge base. Conflict → flag.
# ═══════════════════════════════════════════════════════════════════
@dataclass
class Triplet:
subject: str
predicate: str
object: Any
class ContradictionDetector:
def __init__(self):
self._kb: Dict[str, Dict[str, Triplet]] = {}
def check(self, entry: MemoryEntry) -> Optional[str]:
"""Return conflict description if contradiction found, None otherwise."""
for t in self._extract(entry.value):
existing = self._kb.get(t.subject, {}).get(t.predicate)
if existing and existing.object != t.object:
return (
f"CONTRADICTION: ({t.subject}, {t.predicate}) "
f"stored={existing.object!r} vs incoming={t.object!r}"
)
for t in self._extract(entry.value):
self._kb.setdefault(t.subject, {})[t.predicate] = t
return None
def _extract(self, value: Any) -> List[Triplet]:
"""Simplified SPO extraction. In production: use NER + relation extraction."""
if isinstance(value, dict):
return [
Triplet("__root__", k, v)
for k, v in value.items()
if isinstance(v, (str, int, float, bool))
]
return []
# ═══════════════════════════════════════════════════════════════════
# SECTION 5: Bitemporal store with rollback (VMG-RB + VMG-VF)
# Implements the TOKI pattern (arXiv:2606.06240):
# valid_from / valid_to = fact-time (when the fact was true)
# timestamp = system-time (when the write was committed)
# 4-step GenAlpha incident-response playbook built in.
# ═══════════════════════════════════════════════════════════════════
class BitemporalStore:
def __init__(self):
self._live: Dict[str, MemoryEntry] = {}
self._audit: List[MemoryEntry] = []
self._snapshots: Dict[datetime, Dict] = {}
def commit(self, entry: MemoryEntry) -> None:
if entry.parent_key and entry.parent_key in self._live:
pred = self._live.pop(entry.parent_key)
pred.valid_to = entry.valid_from
self._audit.append(pred)
self._live[entry.key] = entry
self._audit.append(entry)
def snapshot(self) -> datetime:
"""Checkpoint for recovery. Call before any high-risk operation."""
now = datetime.utcnow()
self._snapshots[now] = {
"live": dict(self._live),
"audit_len": len(self._audit),
}
return now
def rollback_to(self, t: datetime) -> None:
"""Restore to a prior snapshot (step 3 of GenAlpha playbook)."""
if t not in self._snapshots:
raise KeyError(f"No snapshot at {t}")
snap = self._snapshots[t]
self._live = dict(snap["live"])
self._audit = self._audit[: snap["audit_len"]]
def quarantine_since(self, t: datetime) -> List[str]:
"""Move all entries written at or after t out of live store (step 2)."""
flagged = [k for k, e in self._live.items() if e.timestamp >= t]
for k in flagged:
self._audit.append(self._live.pop(k))
return flagged
def read_as_of(self, t: datetime) -> Dict[str, MemoryEntry]:
"""Constrain reads to state at time t (step 3)."""
if t in self._snapshots:
return dict(self._snapshots[t]["live"])
return {e.key: e for e in self._audit if e.timestamp <= t}
# ═══════════════════════════════════════════════════════════════════
# SECTION 6: Retrieval influence bounding (VMG-PS + GenAlpha pattern)
# EXTERNAL_TOOL entries are capped in the top-K results and ranked
# lowest in trust order. They enter the context window as
# CONTEXT_ONLY — they cannot directly trigger tool calls or planning.
# ═══════════════════════════════════════════════════════════════════
TRUST_ORDER = {
SourceClass.SYSTEM: 4,
SourceClass.USER: 3,
SourceClass.AGENT_AUTHORED: 2,
SourceClass.EXTERNAL_TOOL: 1, # lowest trust; capped at max_external_in_top_k
}
class RetrievalGuard:
def __init__(self, policy: Policy, store: BitemporalStore):
self.policy = policy
self.store = store
def scope_retrieval(
self, query: str, principal: str, top_k: int = 10
) -> List[MemoryEntry]:
candidates = [
e for e in self.store._live.values()
if principal in (e.principal, "*")
]
candidates.sort(key=lambda e: TRUST_ORDER[e.source_class], reverse=True)
result, ext_count = [], 0
for e in candidates[: top_k * 2]:
if e.source_class == SourceClass.EXTERNAL_TOOL:
if ext_count >= self.policy.max_external_in_top_k:
continue
ext_count += 1
result.append(e)
if len(result) >= top_k:
break
return result
# ═══════════════════════════════════════════════════════════════════
# SECTION 7: MemoryGuard — unified middleware
# This is the only class your agent harness needs to import.
# All six sections above are wired together here.
# ═══════════════════════════════════════════════════════════════════
class MemoryGuard:
def __init__(self, policy: Policy, signing_key: bytes):
self.policy = policy
self.write_guard = WriteGuard(policy, signing_key)
self.contradiction = ContradictionDetector()
self.store = BitemporalStore()
self.retrieval_guard = RetrievalGuard(policy, self.store)
self._last_audit = datetime.utcnow()
def validate_write(self, entry: MemoryEntry) -> Tuple[bool, str]:
"""Complete write-path pipeline. Call before every memory commit.
Execution order:
1. WriteGuard (8 checks: tamper, injection, frequency, canary, …)
2. ContradictionDetector (triplet-based knowledge-base check)
3. Cryptographic signing
"""
ok, reason = self.write_guard.validate(entry)
if not ok:
return False, reason
conflict = self.contradiction.check(entry)
if conflict:
return False, conflict
entry.signature = self.write_guard.sign(entry)
return True, "OK"
def retrieve(
self, query: str, principal: str, top_k: int = 10
) -> List[MemoryEntry]:
"""Scoped retrieval with HMAC signature verification on every entry."""
entries, verified = self.retrieval_guard.scope_retrieval(query, principal, top_k), []
for e in entries:
if e.signature:
expected = self.write_guard.sign(e)
if not hmac.compare_digest(e.signature, expected):
self.store.quarantine_since(e.timestamp)
continue
verified.append(e)
return verified
def periodic_reaudit(self) -> List[str]:
"""Re-scan all live entries for drift (CAMS Layer 5 pattern).
Call hourly — catches slow multi-session poisoning that
cleared the initial write-path checks.
"""
flagged = []
for entry in list(self.store._live.values()):
if self.contradiction.check(entry):
flagged.append(entry.key)
self._last_audit = datetime.utcnow()
return flagged
def snapshot(self) -> datetime:
return self.store.snapshot()
def rollback_to(self, t: datetime) -> None:
self.store.rollback_to(t)
def incident_response(self, compromise_time: datetime) -> Dict[str, Any]:
"""4-step GenAlpha recovery playbook. Returns forensic report.
Step 1: Pinpoint — compromise_time is the known or estimated write time.
Step 2: Quarantine — all entries written at/after compromise_time are removed.
Step 3: Isolate — read access constrained to state before compromise.
Step 4: Re-derive — application-level; re-validate safe interactions.
This step is context-specific: implement per application.
"""
quarantined = self.store.quarantine_since(compromise_time)
safe_state = self.store.read_as_of(compromise_time - timedelta(seconds=1))
return {
"compromise_time": compromise_time.isoformat(),
"quarantined_keys": quarantined,
"safe_state_size": len(safe_state),
"audit_log_entries": len(self.store._audit),
}Three configuration decisions before you ship
SourceClass at ingestion, not at write time. The write-path checks in Section 3 are only as good as the label the entry carries. Tag tool call outputs as SourceClass.EXTERNAL_TOOL the moment they return, RAG-retrieved chunks as EXTERNAL_TOOL, user-authored preferences as USER, and agent-generated summaries as AGENT_AUTHORED. Anything arriving as EXTERNAL_TOOL goes through the staging path (not the direct commit path) — Section 3 Check 3 enforces this. Leave the source label ambiguous and the guard passes everything. 3snapshot() before any high-risk operation and persist incident_response() output. The bitemporal store in Section 5 gives you rollback only if you have prior snapshots. Call guard.snapshot() before processing any batch of external documents, before any compaction event, and at session start. If periodic_reaudit() returns flagged keys, run incident_response(compromise_time) immediately, log the forensic report, and roll back. The TOKI analysis (arXiv:2606.06240) shows that keeping an LLM judge on the live write path causes replay inconsistency and audit erasure — the audit log in Section 5 keeps the judge off the live path and preserves an immutable record. 3What this doesn't cover
참고 출처
- 1arXiv:2606.04329 — From Untrusted Input to Trusted Memory
- 2arXiv:2606.18356 — SafeClawBench
- 3GenAlpha: Memory Poisoning — The Agent Attack That Survives a Reset
- 4OWASP Agent Memory Guard
- 5arXiv:2604.16548 — A Survey on Long-Term Memory Security in LLM Agents
- 6BeyondScale: AI Agent Memory Poisoning Defense Guide 2026




이 콘텐츠를 둘러싼 관점이나 맥락을 계속 보강해 보세요.