- * This class is intentionally limited to process bootstrap concerns only:
+ * This class is intentionally limited to process bootstrap and hosting
+ * concerns:
*
composes PKI runtime components using {@link PkiBootstrap},
+ *
hosts the async-bus maintenance loop via periodic
+ * {@code sweep(...)},
+ *
waits until process termination (Ctrl+C) and performs orderly
+ * shutdown.
*
*
+ *
Async bus sweep
*
- * No cryptography, persistence, or domain/business logic is performed here. The
- * public PKI API resides under {@code zeroecho.pki.api.*} and is not modified
- * by this bootstrap.
+ * The async bus requires periodic {@link AsyncBus#sweep(Instant)} calls to:
+ * expire operations past their deadline and re-synchronize open operations
+ * after restart (depending on the bus implementation).
+ *
+ *
+ *
Security
+ *
+ * Command-line arguments and configuration values are never logged because they
+ * can contain sensitive material (paths, tokens, passphrases).
*
*/
+@SuppressWarnings("PMD.DoNotUseThreads")
public final class PkiApplication {
private static final Logger LOG = Logger.getLogger(PkiApplication.class.getName());
+ /**
+ * System property controlling the async sweep interval in milliseconds.
+ *
+ *
+ * If missing or invalid, a safe default is used.
+ *
+ */
+ private static final String PROP_ASYNC_SWEEP_INTERVAL_MS = "zeroecho.pki.async.sweepIntervalMs";
+
+ /**
+ * Default async sweep interval used when not configured.
+ */
+ private static final Duration DEFAULT_ASYNC_SWEEP_INTERVAL = Duration.ofSeconds(2L);
+
+ /**
+ * Shutdown grace time for the sweep executor.
+ */
+ private static final Duration SWEEP_SHUTDOWN_GRACE = Duration.ofSeconds(10L);
+
private PkiApplication() {
throw new AssertionError("No instances.");
}
@@ -82,18 +126,184 @@ public final class PkiApplication {
LOG.info("ZeroEcho PKI starting.");
- Runtime.getRuntime().addShutdownHook(new Thread(() -> { // NOPMD
- Logger shutdownLogger = Logger.getLogger(PkiApplication.class.getName());
- PkiLogging.emitShutdownMessage(shutdownLogger, "ZeroEcho PKI stopping.");
- }, "zeroecho-pki-shutdown"));
+ CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+ // closed in the shutdown routine
+ ScheduledExecutorService sweepExecutor = Executors.newSingleThreadScheduledExecutor(new SweepThreadFactory()); // NOPMD
+
+ Runtime.getRuntime()
+ .addShutdownHook(new Thread(new ShutdownHook(sweepExecutor, shutdownLatch), "zeroecho-pki-shutdown"));
try {
- // Intentionally no business logic yet. Bootstrap only.
- LOG.info("ZeroEcho PKI started (bootstrap only).");
+ AsyncBus asyncBus = PkiBootstrap.openAsyncBus();
+
+ Duration sweepInterval = readSweepInterval(DEFAULT_ASYNC_SWEEP_INTERVAL);
+
+ if (LOG.isLoggable(Level.INFO)) {
+ LOG.log(Level.INFO, "Async bus sweep enabled; intervalMs={0}", sweepInterval.toMillis());
+ }
+
+ sweepExecutor.scheduleWithFixedDelay(new SweepTask(asyncBus), 0L, sweepInterval.toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ LOG.info("ZeroEcho PKI started.");
+
+ // Keep process alive until Ctrl+C (or other shutdown signal).
+ awaitShutdown(shutdownLatch);
} catch (RuntimeException ex) { // NOPMD
// Do not include user-provided inputs in the message; log the exception object.
LOG.log(Level.SEVERE, "Fatal error during PKI bootstrap.", ex);
throw ex;
}
}
+
+ private static void awaitShutdown(CountDownLatch latch) {
+ try {
+ latch.await();
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ if (LOG.isLoggable(Level.WARNING)) {
+ LOG.log(Level.WARNING, "Interrupted while awaiting shutdown.", ex);
+ }
+ }
+ }
+
+ private static Duration readSweepInterval(Duration defaultValue) {
+ String raw = System.getProperty(PROP_ASYNC_SWEEP_INTERVAL_MS);
+ if (raw == null || raw.isBlank()) {
+ return defaultValue;
+ }
+ try {
+ long ms = Long.parseLong(raw);
+ if (ms <= 0L) { // NOPMD
+ return defaultValue;
+ }
+ return Duration.ofMillis(ms);
+ } catch (NumberFormatException ex) {
+ if (LOG.isLoggable(Level.WARNING)) {
+ LOG.log(Level.WARNING, "Invalid async sweep interval system property; using default.", ex);
+ }
+ return defaultValue;
+ }
+ }
+
+ /**
+ * Periodic maintenance task for asynchronous PKI infrastructure.
+ *
+ *
+ * {@code SweepTask} represents a resilient, repeatable unit of work that
+ * invokes time-based maintenance logic on an {@link AsyncBus} instance. It is
+ * intended to be scheduled at a fixed rate by a background executor and must
+ * tolerate partial failures without disrupting the surrounding runtime
+ * environment.
+ *
+ *
+ *
+ * The task uses the current wall-clock time as a reference for sweep operations
+ * and deliberately suppresses runtime failures, logging them for diagnostic
+ * purposes while allowing future executions to proceed.
+ *
+ *
+ *
+ * This class is internal to the PKI bootstrap and lifecycle management logic
+ * and is not part of the public API surface.
+ *
+ * {@code ShutdownHook} is responsible for orchestrating an orderly shutdown of
+ * background sweep execution during JVM termination. It emits a structured
+ * shutdown message, initiates executor shutdown, and enforces a bounded grace
+ * period for task completion.
+ *
+ *
+ *
+ * The hook guarantees that shutdown coordination always completes by
+ * decrementing the associated {@link CountDownLatch}, regardless of whether
+ * shutdown is graceful, forced, or interrupted.
+ *
+ *
+ *
+ * This class must never throw exceptions or prevent JVM termination. All
+ * failure modes are handled internally.
+ *
+ * {@code SweepThreadFactory} creates daemon threads with a stable and
+ * descriptive naming convention suitable for operational diagnostics and log
+ * correlation. Threads produced by this factory are intentionally marked as
+ * daemon threads so that they do not prolong JVM lifetime.
+ *
+ *
+ *
+ * The factory performs no additional customization such as priority changes or
+ * uncaught-exception handlers, relying instead on executor-level policies.
+ *
+ */
+ private static final class SweepThreadFactory implements ThreadFactory {
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, "zeroecho-pki-async-sweep");
+ t.setDaemon(true);
+ return t;
+ }
+ }
}
diff --git a/pki/src/main/java/zeroecho/pki/api/PkiId.java b/pki/src/main/java/zeroecho/pki/api/PkiId.java
index eaaf42b..ccbe7c7 100644
--- a/pki/src/main/java/zeroecho/pki/api/PkiId.java
+++ b/pki/src/main/java/zeroecho/pki/api/PkiId.java
@@ -62,4 +62,9 @@ public record PkiId(String value) {
throw new IllegalArgumentException("value must not be null/blank");
}
}
+
+ @Override
+ public String toString() {
+ return value;
+ }
}
diff --git a/pki/src/main/java/zeroecho/pki/api/orch/OrchestrationDurabilityPolicy.java b/pki/src/main/java/zeroecho/pki/api/orch/OrchestrationDurabilityPolicy.java
new file mode 100644
index 0000000..03bb901
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/api/orch/OrchestrationDurabilityPolicy.java
@@ -0,0 +1,114 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+package zeroecho.pki.api.orch;
+
+/**
+ * Durability policy for workflow continuation data managed by orchestrators.
+ *
+ *
+ * Orchestrators coordinate long-running operations (e.g., human approvals,
+ * remote signing, certificate issuance). Some workflows require local
+ * continuation state to resume after a JVM restart. This enum defines how such
+ * state is handled.
+ *
+ *
+ *
Examples
+ *
+ *
A remote signature workflow may wait for hours. If the remote system is
+ * the source of truth and all necessary data can be re-derived from external
+ * references (operation ID, request ID), the workflow can run in a durable mode
+ * without storing sensitive material locally.
+ *
If a workflow requires local ephemeral secrets (e.g., a one-time approval
+ * challenge that cannot be recomputed), strict mode should abort on restart
+ * (fail-closed) unless encrypted persistence is configured.
+ * The orchestrator does not persist continuation state that would be required
+ * to resume the workflow after a process restart. If the process restarts,
+ * operations that cannot be safely resumed must be cancelled or marked as
+ * failed deterministically.
+ *
+ *
+ *
+ * Example: A workflow requires an ephemeral secret that is only available in
+ * memory. In this mode, the operation is aborted on restart and the client must
+ * resubmit.
+ *
+ * The orchestrator persists only references and non-sensitive continuation
+ * state, sufficient to resume the workflow after restart. Sensitive payloads
+ * must remain outside the local state (e.g., in dedicated stores or remote
+ * systems).
+ *
+ *
+ *
+ * Example: An operation persists a reference to a stored CSR and a reference to
+ * the selected certificate profile, but does not persist raw key material or
+ * private tokens.
+ *
+ * The orchestrator persists continuation state encrypted (e.g., AES-256/GCM)
+ * using a key supplied by external configuration (environment variable, secret
+ * store, or KMS). This allows workflows to resume after restart even when they
+ * require sensitive local state.
+ *
+ *
+ *
+ * Example: A workflow stores an encrypted blob containing a transient approval
+ * context. The encryption uses authenticated data derived from operation ID,
+ * operation type, and owner identity to prevent swapping blobs across
+ * operations.
+ *
+ */
+ DURABLE_ENCRYPTED_STATE
+}
diff --git a/pki/src/main/java/zeroecho/pki/api/orch/WorkflowStateRecord.java b/pki/src/main/java/zeroecho/pki/api/orch/WorkflowStateRecord.java
new file mode 100644
index 0000000..ea09712
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/api/orch/WorkflowStateRecord.java
@@ -0,0 +1,120 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+package zeroecho.pki.api.orch;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+
+import zeroecho.pki.api.EncodedObject;
+import zeroecho.pki.api.Encoding;
+import zeroecho.pki.api.PkiId;
+import zeroecho.pki.api.audit.Principal;
+
+/**
+ * Persisted continuation state for an orchestrated workflow.
+ *
+ *
+ * This record is used by orchestrators to persist the minimum information
+ * needed to resume long-running workflows after a process restart, according to
+ * the selected {@link OrchestrationDurabilityPolicy}.
+ *
+ *
+ *
Storage model
+ *
+ * The async bus is the authoritative source for operation lifecycle and status,
+ * but it does not persist sensitive payloads and may be configured not to
+ * persist results. Orchestrators therefore persist workflow-specific
+ * continuation state through {@code PkiStore}.
+ *
+ *
+ *
Security
+ *
+ *
{@link #payload()} may contain sensitive information. It must not be
+ * logged.
+ *
When {@link #durabilityPolicy()} is
+ * {@link OrchestrationDurabilityPolicy#DURABLE_ENCRYPTED_STATE},
+ * {@link #payloadEncoding()} must reflect an encrypted form and the payload
+ * must be authenticated using AAD derived from operation identity.
+ *
+ *
+ * @param opId async operation identifier (never {@code null})
+ * @param type operation type string (never {@code null})
+ * @param owner operation owner (never {@code null})
+ * @param durabilityPolicy durability policy (never {@code null})
+ * @param createdAt creation time (never {@code null})
+ * @param updatedAt last update time (never {@code null})
+ * @param expiresAt expiration/deadline (never {@code null})
+ * @param payloadEncoding encoding of {@link #payload()} (never {@code null})
+ * @param payload optional persisted continuation payload
+ */
+public record WorkflowStateRecord(PkiId opId, String type, Principal owner,
+ OrchestrationDurabilityPolicy durabilityPolicy, Instant createdAt, Instant updatedAt, Instant expiresAt,
+ Encoding payloadEncoding, Optional payload) {
+
+ public WorkflowStateRecord {
+ Objects.requireNonNull(opId, "opId");
+ Objects.requireNonNull(type, "type");
+ Objects.requireNonNull(owner, "owner");
+ Objects.requireNonNull(durabilityPolicy, "durabilityPolicy");
+ Objects.requireNonNull(createdAt, "createdAt");
+ Objects.requireNonNull(updatedAt, "updatedAt");
+ Objects.requireNonNull(expiresAt, "expiresAt");
+ Objects.requireNonNull(payloadEncoding, "payloadEncoding");
+ Objects.requireNonNull(payload, "payload");
+
+ if (type.isBlank()) {
+ throw new IllegalArgumentException("type must not be blank.");
+ }
+ if (!expiresAt.isAfter(createdAt) && !expiresAt.equals(createdAt)) {
+ // Allow expiresAt == createdAt for immediate-expire test scenarios, but reject
+ // negative durations.
+ if (expiresAt.isBefore(createdAt)) { // NOPMD
+ throw new IllegalArgumentException("expiresAt must not be before createdAt.");
+ }
+ }
+ }
+
+ /**
+ * Returns whether the record is expired at the provided time.
+ *
+ * @param now current time (never {@code null})
+ * @return {@code true} if {@code now} is strictly after {@link #expiresAt()}
+ */
+ public boolean isExpiredAt(Instant now) {
+ Objects.requireNonNull(now, "now");
+ return now.isAfter(expiresAt);
+ }
+}
diff --git a/pki/src/main/java/zeroecho/pki/api/orch/package-info.java b/pki/src/main/java/zeroecho/pki/api/orch/package-info.java
new file mode 100644
index 0000000..c683992
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/api/orch/package-info.java
@@ -0,0 +1,114 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+/**
+ * Orchestration and workflow durability API for ZeroEcho PKI.
+ *
+ *
+ * This package defines stable domain abstractions related to long-running
+ * PKI workflows and their durability requirements. It is concerned with
+ * describing what must be persisted and how durable such persistence
+ * must be, not with how the persistence is implemented.
+ *
+ *
+ *
Scope and responsibilities
+ *
+ * The orchestration layer in ZeroEcho PKI coordinates multi-step operations
+ * such as certificate issuance, revocation processing, key rollover, or
+ * publication workflows. These operations:
+ *
+ *
+ *
may span multiple logical steps,
+ *
may involve external systems or asynchronous callbacks,
+ *
must tolerate restarts, crashes, or redeployments.
+ *
+ *
+ *
+ * This package provides the minimal API necessary to:
+ *
+ *
+ *
describe the required durability guarantees for workflow state,
+ *
represent persisted workflow state in a storage-agnostic form,
+ *
enable deterministic recovery and audit of workflow progression.
+ *
+ *
+ *
Key abstractions
+ *
+ *
{@link zeroecho.pki.api.orch.OrchestrationDurabilityPolicy
+ * OrchestrationDurabilityPolicy} defines how durable orchestration
+ * state must be (e.g. write-through, buffered, best-effort), allowing different
+ * operational trade-offs without changing orchestration logic.
+ *
{@link zeroecho.pki.api.orch.WorkflowStateRecord WorkflowStateRecord}
+ * represents an immutable, persisted snapshot of workflow state at a specific
+ * point in time, suitable for recovery, audit, and historical inspection.
+ *
+ *
+ *
Immutability and history
+ *
+ * Workflow state records are designed to be immutable once persisted.
+ * Implementations are expected to append new state records rather than
+ * overwrite existing ones, enabling:
+ *
+ *
+ *
full audit trails,
+ *
temporal queries ("state at time T"),
+ *
post-mortem analysis of failed or aborted workflows.
+ *
+ *
+ *
+ * Whether and for how long historical records are retained is governed by
+ * policy and implementation, but the API itself is intentionally compatible
+ * with history-preserving stores.
+ *
+ *
+ *
API stability and integration
+ *
+ * The types in this package are part of the public PKI API surface. They are
+ * intended to be usable across different runtime environments, including:
+ *
+ *
+ *
standalone CLI applications,
+ *
long-running server processes,
+ *
container-managed frameworks such as Spring or Micronaut.
+ *
+ *
+ *
+ * No assumptions are made about the underlying persistence mechanism
+ * (filesystem, database, distributed log, etc.). Such concerns are handled by
+ * SPI and implementation layers.
+ *
+ *
+ * @since 1.0
+ */
+package zeroecho.pki.api.orch;
diff --git a/pki/src/main/java/zeroecho/pki/impl/async/FileBackedAsyncBusProvider.java b/pki/src/main/java/zeroecho/pki/impl/async/FileBackedAsyncBusProvider.java
new file mode 100644
index 0000000..68ef984
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/impl/async/FileBackedAsyncBusProvider.java
@@ -0,0 +1,112 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+package zeroecho.pki.impl.async;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import zeroecho.pki.api.PkiId;
+import zeroecho.pki.api.audit.Principal;
+import zeroecho.pki.spi.ProviderConfig;
+import zeroecho.pki.spi.async.AsyncBusProvider;
+import zeroecho.pki.util.async.AsyncBus;
+import zeroecho.pki.util.async.codec.ResultCodec;
+import zeroecho.pki.util.async.impl.AppendOnlyLineStore;
+import zeroecho.pki.util.async.impl.DurableAsyncBus;
+
+/**
+ * Default async bus provider: in-memory state with append-only file
+ * persistence.
+ *
+ *
+ * This provider instantiates the generic durable bus with PKI-specific
+ * identifier codecs and disables result persistence. Results should be stored
+ * in dedicated PKI storage (credential store, export store, etc.) and the bus
+ * should carry only status and references.
+ *
+ *
+ *
Configuration keys
+ *
+ *
{@code logPath} (optional): append-only log file path. Default:
+ * {@code pki-async/async.log} relative to the working directory.
+ *
+ *
+ *
Security
+ *
+ * The log contains operation metadata and must be protected. The underlying
+ * store applies best-effort restrictive permissions (POSIX when available).
+ *
+ */
+public final class FileBackedAsyncBusProvider implements AsyncBusProvider {
+
+ /**
+ * Configuration key for the log file path.
+ */
+ public static final String KEY_LOG_PATH = "logPath";
+
+ @Override
+ public String id() {
+ return "file";
+ }
+
+ @Override
+ public Set supportedKeys() {
+ return Set.of(KEY_LOG_PATH);
+ }
+
+ @Override
+ public AsyncBus allocate(ProviderConfig config) {
+ Objects.requireNonNull(config, "config");
+
+ if (!id().equals(config.backendId())) {
+ throw new IllegalArgumentException("ProviderConfig backendId mismatch.");
+ }
+
+ Map props = config.properties();
+ String logPath = props.getOrDefault(KEY_LOG_PATH, Path.of("pki-async").resolve("async.log").toString());
+
+ AppendOnlyLineStore store = new AppendOnlyLineStore(Path.of(logPath));
+
+ // Default: do not persist results in the bus log.
+ ResultCodec
Select async bus provider: {@code -Dzeroecho.pki.async=<id>}
+ *
Configure async bus provider:
+ * {@code -Dzeroecho.pki.async.<key>=<value>}
*
*
*
@@ -91,6 +98,9 @@ public final class PkiBootstrap {
private static final String PROP_CRYPTO_WORKFLOW_BACKEND = "zeroecho.pki.crypto.workflow";
private static final String PROP_CRYPTO_WORKFLOW_PREFIX = "zeroecho.pki.crypto.workflow.";
+ private static final String PROP_ASYNC_BACKEND = "zeroecho.pki.async";
+ private static final String PROP_ASYNC_PREFIX = "zeroecho.pki.async.";
+
private PkiBootstrap() {
throw new AssertionError("No instances.");
}
@@ -117,13 +127,7 @@ public final class PkiBootstrap {
* are never logged; only keys may be logged.
*
*
- *
- * Defaulting rules implemented by this bootstrap (policy, not SPI requirement):
- * for {@code fs} provider, if {@code root} is not specified, defaults to
- * {@code "pki-store"} relative to the working directory.
- *
- *
- * @return opened store (never {@code null})
+ * @return store (never {@code null})
*/
public static PkiStore openStore() {
String requestedId = System.getProperty(PROP_STORE_BACKEND);
@@ -151,40 +155,11 @@ public final class PkiBootstrap {
return provider.allocate(config);
}
- /**
- * Logs provider help information (supported keys) for diagnostics.
- *
- *
- * This method is safe: it does not log configuration values.
- *
- * Selection and configuration follow the same conventions as
- * {@link #openStore()}, using {@code -Dzeroecho.pki.audit=<id>} and
- * {@code zeroecho.pki.audit.} prefixed properties.
- *
- *
- *
- * Defaulting rules implemented by this bootstrap (policy, not SPI requirement):
- * if no audit backend is specified, defaults to {@code stdout}. For
- * {@code file} provider, if {@code root} is not specified, defaults to
- * {@code "pki-audit"} relative to the working directory.
- *
- *
- * @return opened audit sink (never {@code null})
+ * @return audit sink (never {@code null})
*/
public static AuditSink openAudit() {
String requestedId = System.getProperty(PROP_AUDIT_BACKEND);
@@ -220,21 +195,7 @@ public final class PkiBootstrap {
* Opens a {@link SignatureWorkflow} using {@link SignatureWorkflowProvider}
* discovered via ServiceLoader.
*
- *
- * Security note: configuration values may be sensitive and must not be logged.
- * This bootstrap logs only provider ids and configuration keys.
- *
- *
- * @return opened signature workflow (never {@code null})
+ * @return signature workflow (never {@code null})
*/
public static SignatureWorkflow openSignatureWorkflow() {
String requestedId = System.getProperty(PROP_CRYPTO_WORKFLOW_BACKEND);
@@ -248,6 +209,7 @@ public final class PkiBootstrap {
});
Map props = SpiSystemProperties.readPrefixed(PROP_CRYPTO_WORKFLOW_PREFIX);
+
ProviderConfig config = new ProviderConfig(provider.id(), props);
if (LOG.isLoggable(Level.INFO)) {
@@ -256,4 +218,66 @@ public final class PkiBootstrap {
return provider.allocate(config);
}
+
+ /**
+ * Opens an async operation bus using {@link AsyncBusProvider} discovered via
+ * ServiceLoader.
+ *
+ *
+ * Defaulting rule: if no backend id is specified, defaults to {@code file}.
+ *
+ *
+ *
+ * Configuration properties are read from {@link System#getProperties()} using
+ * the prefix {@code zeroecho.pki.async.}. Values are treated as sensitive and
+ * are never logged; only keys may be logged.
+ *
+ * This method is safe: it does not log configuration values.
+ *
+ *
+ * @param provider provider (never {@code null})
+ */
+ public static void logSupportedKeys(ConfigurableProvider provider) {
+ Objects.requireNonNull(provider, "provider");
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("Provider '" + provider.id() + "' supports keys: " + provider.supportedKeys());
+ }
+ }
}
diff --git a/pki/src/main/java/zeroecho/pki/spi/framework/CredentialFrameworkProvider.java b/pki/src/main/java/zeroecho/pki/spi/framework/CredentialFrameworkProvider.java
new file mode 100644
index 0000000..0f0a6fa
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/spi/framework/CredentialFrameworkProvider.java
@@ -0,0 +1,102 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+package zeroecho.pki.spi.framework;
+
+import java.util.Objects;
+
+import zeroecho.pki.spi.ConfigurableProvider;
+import zeroecho.pki.spi.ProviderConfig;
+
+/**
+ * Service provider for {@link CredentialFramework} instances.
+ *
+ *
+ * The PKI runtime selects a framework provider using
+ * {@link java.util.ServiceLoader} and instantiates it through
+ * {@link #allocate(ProviderConfig)}. Provider selection is performed by
+ * {@code PkiBootstrap} using configuration properties (similarly to store,
+ * audit, async bus, and signature workflow providers).
+ *
+ *
+ *
Dependency boundary
+ *
+ * Framework implementations may internally rely on third-party toolkits (for
+ * example Bouncy Castle), but such dependencies must not leak through this SPI.
+ * The SPI must remain stable and independent of a particular cryptographic
+ * provider.
+ *
+ */
+public interface CredentialFrameworkProvider extends ConfigurableProvider {
+
+ /**
+ * Allocates a credential framework instance using the provided configuration.
+ *
+ *
+ * Implementations must validate that {@link ProviderConfig#backendId()} matches
+ * {@link #id()}. A mismatch must be reported as
+ * {@link IllegalArgumentException}.
+ *
+ *
+ * @param config provider configuration (never {@code null})
+ * @return allocated framework (never {@code null})
+ * @throws NullPointerException if {@code config} is {@code null}
+ * @throws IllegalArgumentException if {@code config.backendId()} does not match
+ * {@link #id()}
+ * @throws RuntimeException if allocation fails
+ */
+ @Override
+ CredentialFramework allocate(ProviderConfig config);
+
+ /**
+ * Enforces that the provided configuration is intended for this provider.
+ *
+ *
+ * This helper is intended for defensive checks inside provider implementations.
+ *
+ *
+ * @param provider provider instance (never {@code null})
+ * @param config provider configuration (never {@code null})
+ * @throws NullPointerException if any argument is {@code null}
+ * @throws IllegalArgumentException if the backend id does not match
+ */
+ static void requireIdMatch(final CredentialFrameworkProvider provider, final ProviderConfig config) {
+ Objects.requireNonNull(provider, "provider");
+ Objects.requireNonNull(config, "config");
+
+ if (!provider.id().equals(config.backendId())) {
+ throw new IllegalArgumentException("ProviderConfig backendId mismatch.");
+ }
+ }
+}
diff --git a/pki/src/main/java/zeroecho/pki/spi/store/PkiStore.java b/pki/src/main/java/zeroecho/pki/spi/store/PkiStore.java
index a02f6d5..01dd288 100644
--- a/pki/src/main/java/zeroecho/pki/spi/store/PkiStore.java
+++ b/pki/src/main/java/zeroecho/pki/spi/store/PkiStore.java
@@ -40,6 +40,7 @@ import java.util.Optional;
import zeroecho.pki.api.PkiId;
import zeroecho.pki.api.ca.CaRecord;
import zeroecho.pki.api.credential.Credential;
+import zeroecho.pki.api.orch.WorkflowStateRecord;
import zeroecho.pki.api.policy.PolicyTrace;
import zeroecho.pki.api.profile.CertificateProfile;
import zeroecho.pki.api.publication.PublicationRecord;
@@ -48,31 +49,27 @@ import zeroecho.pki.api.revocation.RevokedRecord;
import zeroecho.pki.api.status.StatusObject;
/**
- * Persistence abstraction for PKI state.
+ * Persistence boundary for PKI state.
*
*
- * This Service Provider Interface (SPI) defines the authoritative storage
- * contract for all PKI-managed state, including CA entities, issued
- * credentials, certification requests, revocations, status objects, profiles,
- * publications, and policy traces.
+ * This Service Provider Interface (SPI) defines durable storage for the PKI
+ * domain model. Implementations may use a filesystem, database, or other
+ * backend, but must preserve deterministic behavior, atomicity guarantees, and
+ * must not leak secrets through logging or exception messages.
*
*
+ *
Security
*
- * The interface is intentionally coarse-grained and framework-agnostic.
- * Implementations are responsible for providing appropriate durability,
- * consistency, and concurrency guarantees according to the deployment model
- * (filesystem, embedded storage, RDBMS, distributed store, etc.).
+ * Implementations must protect persisted data appropriately (for example:
+ * filesystem permissions, database ACLs). Sensitive material must never be
+ * written to logs. Exceptions should avoid leaking secrets in their messages.
*
*
+ *
Error handling
*
- * Security requirements:
+ * This SPI uses unchecked failures. Implementations should throw
+ * {@link IllegalStateException} when an operation cannot be completed safely.
*
- *
- *
Private key material MUST NOT be persisted.
- *
Secrets must never be stored in cleartext.
- *
Stored objects must be treated as immutable unless explicitly
- * replaced.
- *
*/
public interface PkiStore {
@@ -80,62 +77,56 @@ public interface PkiStore {
* Persists or updates a Certificate Authority (CA) record.
*
*
- * Implementations must ensure that CA records are stored atomically. Replacing
- * an existing CA record must preserve historical integrity (e.g., previously
- * issued credentials must remain resolvable).
+ * Implementations must store CA records atomically. Replacing an existing
+ * record should be either fully visible or not visible at all.
*
*
- * @param record CA record to persist
- * @throws IllegalArgumentException if {@code record} is null
- * @throws RuntimeException if persistence fails
+ * @param record CA record (never {@code null})
+ * @throws NullPointerException if {@code record} is {@code null}
+ * @throws IllegalStateException if persistence fails
*/
void putCa(CaRecord record);
/**
- * Retrieves a CA record by its identifier.
+ * Retrieves a CA record.
*
- * @param caId CA identifier
- * @return CA record if present, otherwise {@link Optional#empty()}
- * @throws IllegalArgumentException if {@code caId} is null
- * @throws RuntimeException if retrieval fails
+ * @param caId CA identifier (never {@code null})
+ * @return CA record if present
+ * @throws NullPointerException if {@code caId} is {@code null}
+ * @throws IllegalStateException if retrieval fails
*/
Optional getCa(PkiId caId);
/**
- * Lists all CA records known to the store.
+ * Lists all stored CA records.
*
- *
- * No filtering is applied at this level; higher layers are expected to perform
- * query-based filtering.
- *
- *
- * @return list of CA records (never null)
- * @throws RuntimeException if listing fails
+ * @return list of CA records (never {@code null})
+ * @throws IllegalStateException if listing fails
*/
List listCas();
/**
- * Persists an issued credential.
+ * Persists a credential record.
*
*
- * Credentials are immutable once stored. Re-inserting an existing credential
- * identifier should either be idempotent or rejected, depending on
- * implementation policy.
+ * Credentials are typically immutable once issued. Implementations must ensure
+ * that storing a credential is atomic and that the record remains retrievable
+ * for its entire retention period.
*
*
- * @param credential credential to persist
- * @throws IllegalArgumentException if {@code credential} is null
- * @throws RuntimeException if persistence fails
+ * @param credential credential record (never {@code null})
+ * @throws NullPointerException if {@code credential} is {@code null}
+ * @throws IllegalStateException if persistence fails
*/
void putCredential(Credential credential);
/**
- * Retrieves an issued credential by its identifier.
+ * Retrieves a credential record.
*
- * @param credentialId credential identifier
- * @return credential if present
- * @throws IllegalArgumentException if {@code credentialId} is null
- * @throws RuntimeException if retrieval fails
+ * @param credentialId credential identifier (never {@code null})
+ * @return credential record if present
+ * @throws NullPointerException if {@code credentialId} is {@code null}
+ * @throws IllegalStateException if retrieval fails
*/
Optional getCredential(PkiId credentialId);
@@ -143,162 +134,233 @@ public interface PkiStore {
* Persists a parsed certification request.
*
*
- * Stored requests are used for audit, correlation, re-issuance, and ACME-like
- * workflows.
+ * Requests are typically immutable artifacts. Implementations should treat them
+ * as write-once records unless there is a clear need to support updates.
*
*
- * @param request parsed certification request
- * @throws IllegalArgumentException if {@code request} is null
- * @throws RuntimeException if persistence fails
+ * @param request parsed request (never {@code null})
+ * @throws NullPointerException if {@code request} is {@code null}
+ * @throws IllegalStateException if persistence fails
*/
void putRequest(ParsedCertificationRequest request);
/**
- * Retrieves a stored certification request.
+ * Retrieves a parsed certification request.
*
- * @param requestId request identifier
+ * @param requestId request identifier (never {@code null})
* @return parsed request if present
- * @throws IllegalArgumentException if {@code requestId} is null
- * @throws RuntimeException if retrieval fails
+ * @throws NullPointerException if {@code requestId} is {@code null}
+ * @throws IllegalStateException if retrieval fails
*/
Optional getRequest(PkiId requestId);
/**
- * Persists a revocation record.
+ * Persists or updates a revocation record.
*
- *
- * Revocation records are authoritative inputs for generating revocation status
- * objects (CRLs, OCSP, etc.).
- *
- *
- * @param record revocation record
- * @throws IllegalArgumentException if {@code record} is null
- * @throws RuntimeException if persistence fails
+ * @param record revocation record (never {@code null})
+ * @throws NullPointerException if {@code record} is {@code null}
+ * @throws IllegalStateException if persistence fails
*/
void putRevocation(RevokedRecord record);
/**
- * Retrieves the revocation record for a credential.
+ * Retrieves a revocation record for a given credential.
*
- * @param credentialId credential identifier
+ * @param credentialId credential identifier (never {@code null})
* @return revocation record if present
- * @throws IllegalArgumentException if {@code credentialId} is null
- * @throws RuntimeException if retrieval fails
+ * @throws NullPointerException if {@code credentialId} is {@code null}
+ * @throws IllegalStateException if retrieval fails
*/
Optional getRevocation(PkiId credentialId);
/**
* Lists all revocation records.
*
- * @return list of revocation records (never null)
- * @throws RuntimeException if listing fails
+ * @return list of revocation records (never {@code null})
+ * @throws IllegalStateException if listing fails
*/
List listRevocations();
/**
- * Persists a generated status object.
+ * Persists a status object.
*
*
- * Status objects include CRLs, delta CRLs, OCSP responses, or
- * framework-specific revocation lists.
+ * Status objects are typically published artifacts (for example OCSP responses,
+ * CRLs, or other status representations) and are usually immutable once
+ * created.
*
*
- * @param object status object to persist
- * @throws IllegalArgumentException if {@code object} is null
- * @throws RuntimeException if persistence fails
+ * @param object status object (never {@code null})
+ * @throws NullPointerException if {@code object} is {@code null}
+ * @throws IllegalStateException if persistence fails
*/
void putStatusObject(StatusObject object);
/**
- * Retrieves a status object by its identifier.
+ * Retrieves a status object.
*
- * @param statusObjectId status object identifier
+ * @param statusObjectId status object identifier (never {@code null})
* @return status object if present
- * @throws IllegalArgumentException if {@code statusObjectId} is null
- * @throws RuntimeException if retrieval fails
+ * @throws NullPointerException if {@code statusObjectId} is {@code null}
+ * @throws IllegalStateException if retrieval fails
*/
Optional getStatusObject(PkiId statusObjectId);
/**
- * Lists all status objects issued by a given CA.
+ * Lists status objects issued under a specific issuer CA.
*
- * @param issuerCaId issuer CA identifier
- * @return list of status objects (never null)
- * @throws IllegalArgumentException if {@code issuerCaId} is null
- * @throws RuntimeException if listing fails
+ *
+ * This method exists to support issuer-scoped publication and retrieval
+ * patterns (for example: listing all CRLs or other status artifacts produced by
+ * a given CA).
+ *
+ *
+ * @param issuerCaId issuer CA identifier (never {@code null})
+ * @return list of status objects for the issuer (never {@code null})
+ * @throws NullPointerException if {@code issuerCaId} is {@code null}
+ * @throws IllegalStateException if listing fails
*/
List listStatusObjects(PkiId issuerCaId);
/**
- * Persists a publication record.
+ * Persists or updates a publication record.
*
*
- * Publication records provide traceability and operational diagnostics for
- * artifact distribution.
+ * Publication records describe distribution state (for example: where and when
+ * an object was published). These records may be used for operational
+ * monitoring and reconciliation.
*
*
- * @param record publication record
- * @throws IllegalArgumentException if {@code record} is null
- * @throws RuntimeException if persistence fails
+ * @param record publication record (never {@code null})
+ * @throws NullPointerException if {@code record} is {@code null}
+ * @throws IllegalStateException if persistence fails
*/
void putPublicationRecord(PublicationRecord record);
/**
* Lists all publication records.
*
- * @return list of publication records (never null)
- * @throws RuntimeException if listing fails
+ * @return list of publication records (never {@code null})
+ * @throws IllegalStateException if listing fails
*/
List listPublicationRecords();
/**
* Persists or updates a certificate profile.
*
- * @param profile certificate profile
- * @throws IllegalArgumentException if {@code profile} is null
- * @throws RuntimeException if persistence fails
+ *
+ * A certificate profile represents a reusable issuance template (for example:
+ * VPN client, VPN server, S/MIME). The profile may be referenced by higher
+ * layers during certificate issuance.
+ *
+ *
+ * @param profile certificate profile (never {@code null})
+ * @throws NullPointerException if {@code profile} is {@code null}
+ * @throws IllegalStateException if persistence fails
*/
void putProfile(CertificateProfile profile);
/**
- * Retrieves a certificate profile by identifier.
+ * Retrieves a certificate profile by profile identifier.
*
- * @param profileId profile identifier
+ *
+ * The identifier is a stable, system-defined key (not a display name). It
+ * should be suitable for configuration and API use (for example:
+ * {@code "vpn-client"}).
+ *
+ *
+ * @param profileId profile identifier (never {@code null})
* @return profile if present
- * @throws IllegalArgumentException if {@code profileId} is null or blank
- * @throws RuntimeException if retrieval fails
+ * @throws NullPointerException if {@code profileId} is {@code null}
+ * @throws IllegalStateException if retrieval fails
*/
Optional getProfile(String profileId);
/**
* Lists all stored certificate profiles.
*
- * @return list of profiles (never null)
- * @throws RuntimeException if listing fails
+ * @return list of profiles (never {@code null})
+ * @throws IllegalStateException if listing fails
*/
List listProfiles();
/**
- * Persists a policy evaluation trace.
+ * Persists a policy trace.
*
*
- * Policy traces are used for explainability, audit, and compliance evidence.
- * They must never contain sensitive data.
+ * Policy traces capture decision-making and evaluation information. These
+ * records may contain sensitive meta-information and must be protected by the
+ * underlying store.
*
*
- * @param trace policy trace
- * @throws IllegalArgumentException if {@code trace} is null
- * @throws RuntimeException if persistence fails
+ * @param trace policy trace (never {@code null})
+ * @throws NullPointerException if {@code trace} is {@code null}
+ * @throws IllegalStateException if persistence fails
*/
void putPolicyTrace(PolicyTrace trace);
/**
- * Retrieves a policy trace by decision identifier.
+ * Retrieves a policy trace.
*
- * @param decisionId policy decision identifier
+ * @param decisionId policy decision identifier (never {@code null})
* @return policy trace if present
- * @throws IllegalArgumentException if {@code decisionId} is null
- * @throws RuntimeException if retrieval fails
+ * @throws NullPointerException if {@code decisionId} is {@code null}
+ * @throws IllegalStateException if retrieval fails
*/
Optional getPolicyTrace(PkiId decisionId);
+
+ // -------------------------------------------------------------------------
+ // Workflow continuation state (orchestration)
+ // -------------------------------------------------------------------------
+
+ /**
+ * Persists or updates an orchestrator workflow continuation record.
+ *
+ *
+ * The workflow payload may be encrypted. Implementations must treat it as
+ * sensitive and must not log it.
+ *
+ *
+ * @param record workflow state record (never {@code null})
+ * @throws NullPointerException if {@code record} is {@code null}
+ * @throws IllegalStateException if persistence fails
+ */
+ void putWorkflowState(WorkflowStateRecord record);
+
+ /**
+ * Retrieves workflow continuation state for a given operation.
+ *
+ * @param opId operation identifier (never {@code null})
+ * @return workflow state record if present
+ * @throws NullPointerException if {@code opId} is {@code null}
+ * @throws IllegalStateException if retrieval fails
+ */
+ Optional getWorkflowState(PkiId opId);
+
+ /**
+ * Deletes workflow continuation state for a given operation.
+ *
+ *
+ * Implementations may retain historical snapshots according to their history
+ * policy. Deletion guarantees only removal of the current record.
+ *
+ *
+ * @param opId operation identifier (never {@code null})
+ * @throws NullPointerException if {@code opId} is {@code null}
+ * @throws IllegalStateException if deletion fails
+ */
+ void deleteWorkflowState(PkiId opId);
+
+ /**
+ * Lists workflow continuation state records.
+ *
+ *
+ * This method is intended for administrative reconciliation and recovery.
+ * Higher layers should apply access control and filtering as appropriate.
+ *
+ *
+ * @return list of workflow state records (never {@code null})
+ * @throws IllegalStateException if listing fails
+ */
+ List listWorkflowStates();
}
diff --git a/pki/src/main/java/zeroecho/pki/util/PkiIds.java b/pki/src/main/java/zeroecho/pki/util/PkiIds.java
new file mode 100644
index 0000000..f0fd4e6
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/util/PkiIds.java
@@ -0,0 +1,88 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+package zeroecho.pki.util;
+
+import java.util.Objects;
+import java.util.UUID;
+
+import zeroecho.pki.api.PkiId;
+
+/**
+ * Utility methods for generating and validating {@link PkiId} values.
+ *
+ *
+ * The PKI subsystem uses {@link PkiId} as a stable identifier for domain
+ * records and long-running operations. This helper centralizes ID generation to
+ * avoid ad-hoc implementations scattered across providers and orchestration
+ * code.
+ *
+ *
+ *
Security
+ *
+ * Generated identifiers must not embed secrets. The default generator uses a
+ * random UUID which is suitable for correlation and storage keys but does not
+ * reveal secret material.
+ *
+ */
+public final class PkiIds {
+
+ private PkiIds() {
+ throw new AssertionError("No instances.");
+ }
+
+ /**
+ * Generates a new random PKI identifier.
+ *
+ *
+ * The returned identifier is based on {@link UUID#randomUUID()}.
+ *
+ *
+ * @return new identifier (never {@code null})
+ */
+ public static PkiId newRandomId() {
+ return new PkiId(UUID.randomUUID().toString());
+ }
+
+ /**
+ * Validates that the provided identifier is non-null.
+ *
+ * @param id identifier to validate
+ * @return the same identifier for fluent usage
+ * @throws IllegalArgumentException if {@code id} is {@code null}
+ */
+ public static PkiId require(PkiId id) {
+ return Objects.requireNonNull(id, "id");
+ }
+}
diff --git a/pki/src/main/java/zeroecho/pki/util/async/AsyncBus.java b/pki/src/main/java/zeroecho/pki/util/async/AsyncBus.java
new file mode 100644
index 0000000..7461f51
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/util/async/AsyncBus.java
@@ -0,0 +1,156 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+package zeroecho.pki.util.async;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
+/**
+ * Generic asynchronous operation bus.
+ *
+ *
+ * The bus is responsible for:
+ *
+ *
+ *
tracking operation metadata and status transitions,
+ * Results are retained until explicitly retrieved (or expired by retention
+ * policy). Implementations must remove stored results once
+ * {@link #consumeResult(Object)} returns a value. This ensures bounded memory
+ * usage.
+ *
+ *
+ * @param operation id type
+ * @param owner type
+ * @param endpoint id type
+ * @param result type
+ */
+public interface AsyncBus {
+
+ /**
+ * Registers an endpoint instance under an identifier.
+ *
+ *
+ * Registration must be performed by the endpoint itself during construction or
+ * activation, not by bootstrap code, to avoid tight coupling.
+ *
+ *
+ * @param endpointId endpoint id (never {@code null})
+ * @param endpoint endpoint instance (never {@code null})
+ */
+ void registerEndpoint(EndpointId endpointId, AsyncEndpoint endpoint);
+
+ /**
+ * Subscribes a handler to status change events.
+ *
+ * @param handler handler (never {@code null})
+ * @return registration handle (never {@code null})
+ */
+ AsyncRegistration subscribe(AsyncHandler handler);
+
+ /**
+ * Creates a new operation record and sets initial status to
+ * {@link AsyncState#SUBMITTED}.
+ *
+ * @param opId operation id (never {@code null})
+ * @param type operation type (never blank)
+ * @param owner owner (never {@code null})
+ * @param endpointId endpoint id (never {@code null})
+ * @param createdAt creation time (never {@code null})
+ * @param ttl time-to-live (must be positive)
+ * @return created snapshot (never {@code null})
+ */
+ AsyncOperationSnapshot submit(OpId opId, String type, Owner owner, EndpointId endpointId,
+ Instant createdAt, Duration ttl);
+
+ /**
+ * Updates status and optional result for an operation.
+ *
+ *
+ * Implementations must persist the transition before dispatching events.
+ *
+ *
+ * @param opId operation id (never {@code null})
+ * @param status new status (never {@code null})
+ * @param result optional result (never {@code null})
+ */
+ void update(OpId opId, AsyncStatus status, Optional result);
+
+ /**
+ * Retrieves the latest known status for an operation.
+ *
+ * @param opId operation id (never {@code null})
+ * @return status if known
+ */
+ Optional status(OpId opId);
+
+ /**
+ * Returns immutable operation metadata if present.
+ *
+ * @param opId operation id (never {@code null})
+ * @return snapshot if known
+ */
+ Optional> snapshot(OpId opId);
+
+ /**
+ * Consumes the stored result if present and removes it from the bus.
+ *
+ * @param opId operation id (never {@code null})
+ * @return result if present (and then removed)
+ */
+ Optional consumeResult(OpId opId);
+
+ /**
+ * Performs periodic maintenance:
+ *
+ *
+ *
expires operations past their deadline,
+ *
polls endpoints for open operations (restart recovery),
+ *
dispatches any discovered transitions.
+ *
+ *
+ * @param now current time (never {@code null})
+ */
+ void sweep(Instant now);
+}
diff --git a/pki/src/main/java/zeroecho/pki/util/async/AsyncEndpoint.java b/pki/src/main/java/zeroecho/pki/util/async/AsyncEndpoint.java
new file mode 100644
index 0000000..c62ddc8
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/util/async/AsyncEndpoint.java
@@ -0,0 +1,85 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+package zeroecho.pki.util.async;
+
+import java.time.Instant;
+import java.util.Optional;
+
+/**
+ * Target endpoint that owns/executes operations and can be asked for their
+ * status.
+ *
+ *
+ * The bus uses endpoints to:
+ *
+ *
+ *
poll open operations on {@link AsyncBus#sweep(Instant)} (restart
+ * recovery),
+ *
deliver status/result updates via
+ * {@link AsyncBus#update(Object, AsyncStatus, Optional)}.
+ *
+ *
+ * @param operation id type
+ * @param result type
+ */
+public interface AsyncEndpoint {
+
+ /**
+ * Returns a best-effort current status for a known operation.
+ *
+ *
+ * Endpoints should return a terminal status if the operation is complete. If
+ * unknown, endpoints may return an empty optional.
+ *
+ *
+ * @param opId operation id (never {@code null})
+ * @return status if known
+ */
+ Optional status(OpId opId);
+
+ /**
+ * Returns a best-effort result for a successful operation.
+ *
+ *
+ * Endpoints should return a value only if the operation has succeeded and the
+ * result is retrievable. If results are stored elsewhere, this may return
+ * empty.
+ *
+ *
+ * @param opId operation id (never {@code null})
+ * @return result if available
+ */
+ Optional result(OpId opId);
+}
diff --git a/pki/src/main/java/zeroecho/pki/util/async/AsyncEvent.java b/pki/src/main/java/zeroecho/pki/util/async/AsyncEvent.java
new file mode 100644
index 0000000..09b40ff
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/util/async/AsyncEvent.java
@@ -0,0 +1,65 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+package zeroecho.pki.util.async;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Event delivered to registered handlers whenever an operation status changes.
+ *
+ * @param operation identifier type
+ * @param operation owner type
+ * @param endpoint identifier type
+ * @param result type
+ * @param snapshot immutable operation metadata (never {@code null})
+ * @param status new status (never {@code null})
+ * @param result optional result; present only when
+ * {@link AsyncStatus#state()} is {@link AsyncState#SUCCEEDED}
+ */
+public record AsyncEvent(AsyncOperationSnapshot snapshot,
+ AsyncStatus status, Optional result) {
+
+ /**
+ * Creates an event.
+ *
+ * @throws IllegalArgumentException if any required field is null
+ */
+ public AsyncEvent {
+ Objects.requireNonNull(snapshot, "snapshot");
+ Objects.requireNonNull(status, "status");
+ Objects.requireNonNull(result, "result");
+ }
+}
diff --git a/pki/src/main/java/zeroecho/pki/util/async/AsyncHandler.java b/pki/src/main/java/zeroecho/pki/util/async/AsyncHandler.java
new file mode 100644
index 0000000..9e718b7
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/util/async/AsyncHandler.java
@@ -0,0 +1,60 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+package zeroecho.pki.util.async;
+
+/**
+ * Callback handler for async operation events.
+ *
+ *
+ * Implementations must be exception-safe: the bus will treat any handler
+ * exception as a handler failure and will continue dispatching to other
+ * handlers.
+ *
+ *
+ * @param operation id type
+ * @param owner type
+ * @param endpoint id type
+ * @param result type
+ */
+@FunctionalInterface
+public interface AsyncHandler {
+
+ /**
+ * Receives an event notification.
+ *
+ * @param event event (never {@code null})
+ */
+ void onEvent(AsyncEvent event);
+}
diff --git a/pki/src/main/java/zeroecho/pki/util/async/AsyncOperationSnapshot.java b/pki/src/main/java/zeroecho/pki/util/async/AsyncOperationSnapshot.java
new file mode 100644
index 0000000..246e8eb
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/util/async/AsyncOperationSnapshot.java
@@ -0,0 +1,81 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+package zeroecho.pki.util.async;
+
+import java.time.Instant;
+import java.util.Objects;
+
+/**
+ * Immutable snapshot of an operation's identity and routing metadata.
+ *
+ *
+ * This object intentionally does not contain secrets. Payloads should be stored
+ * outside the bus (or referenced indirectly).
+ *
+ *
+ * @param operation identifier type
+ * @param operation owner type
+ * @param endpoint identifier type
+ * @param opId operation id (never {@code null})
+ * @param type operation type string (never blank)
+ * @param owner operation owner (never {@code null})
+ * @param endpointId endpoint id (never {@code null})
+ * @param createdAt creation time (never {@code null})
+ * @param expiresAt expiration deadline (never {@code null})
+ */
+public record AsyncOperationSnapshot(OpId opId, String type, Owner owner,
+ EndpointId endpointId, Instant createdAt, Instant expiresAt) {
+
+ /**
+ * Creates a snapshot and validates invariants.
+ *
+ * @throws IllegalArgumentException if required fields are null/blank or expiry
+ * is invalid
+ */
+ public AsyncOperationSnapshot {
+ Objects.requireNonNull(opId, "opId");
+ Objects.requireNonNull(type, "type");
+ Objects.requireNonNull(owner, "owner");
+ Objects.requireNonNull(endpointId, "endpointId");
+ Objects.requireNonNull(createdAt, "createdAt");
+ Objects.requireNonNull(expiresAt, "expiresAt");
+ if (type.isBlank()) {
+ throw new IllegalArgumentException("type must not be blank");
+ }
+ if (!expiresAt.isAfter(createdAt)) {
+ throw new IllegalArgumentException("expiresAt must be after createdAt");
+ }
+ }
+}
diff --git a/pki/src/main/java/zeroecho/pki/internal/package-info.java b/pki/src/main/java/zeroecho/pki/util/async/AsyncRegistration.java
similarity index 82%
rename from pki/src/main/java/zeroecho/pki/internal/package-info.java
rename to pki/src/main/java/zeroecho/pki/util/async/AsyncRegistration.java
index 04d2fe1..fe04867 100644
--- a/pki/src/main/java/zeroecho/pki/internal/package-info.java
+++ b/pki/src/main/java/zeroecho/pki/util/async/AsyncRegistration.java
@@ -32,7 +32,24 @@
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
******************************************************************************/
+package zeroecho.pki.util.async;
+
+import java.io.Closeable;
+
/**
- *
+ * Registration handle for a handler subscription.
+ *
+ *
+ * Closing the handle unregisters the handler. Implementations must be
+ * idempotent.
+ *
*/
-package zeroecho.pki.internal;
\ No newline at end of file
+@SuppressWarnings("PMD.ImplicitFunctionalInterface")
+public interface AsyncRegistration extends Closeable {
+
+ /**
+ * Unregisters the handler.
+ */
+ @Override
+ void close();
+}
diff --git a/pki/src/main/java/zeroecho/pki/util/async/AsyncState.java b/pki/src/main/java/zeroecho/pki/util/async/AsyncState.java
new file mode 100644
index 0000000..5589b43
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/util/async/AsyncState.java
@@ -0,0 +1,109 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+package zeroecho.pki.util.async;
+
+/**
+ * Lifecycle state of an asynchronous operation.
+ *
+ *
+ * The state machine is intentionally simple and audit-friendly. Implementations
+ * may attach additional non-sensitive detail through
+ * {@link AsyncStatus#detailCode()} and {@link AsyncStatus#details()}.
+ *
+ */
+public enum AsyncState {
+
+ /**
+ * The operation has been created and is awaiting processing.
+ *
+ *
+ * Example: a certificate issuance request has been accepted, but is waiting for
+ * an operator approval step.
+ *
+ */
+ SUBMITTED,
+
+ /**
+ * The operation is currently being processed.
+ *
+ *
+ * Example: an issuance request is being built, proof-of-possession verified, or
+ * a signing workflow is executing.
+ *
+ */
+ RUNNING,
+
+ /**
+ * The operation completed successfully and the result is available for
+ * retrieval.
+ *
+ *
+ * Example: a certificate (or bundle) has been issued and is ready to be
+ * fetched.
+ *
+ * Example: proof-of-possession verification failed or a policy rule rejected
+ * the request. The failure reason should be exposed via a stable, non-secret
+ * {@link AsyncStatus#detailCode()} and optional {@link AsyncStatus#details()}.
+ *
+ */
+ FAILED,
+
+ /**
+ * The operation was cancelled by a caller or operator.
+ *
+ *
+ * Example: an administrator cancelled a long-running operation before it was
+ * approved.
+ *
+ */
+ CANCELLED,
+
+ /**
+ * The operation exceeded its declared deadline and was expired by the bus.
+ *
+ *
+ * Example: a request had a 24h approval window; after that it is no longer
+ * eligible for completion and is treated as expired.
+ *
+ */
+ EXPIRED
+}
diff --git a/pki/src/main/java/zeroecho/pki/util/async/AsyncStatus.java b/pki/src/main/java/zeroecho/pki/util/async/AsyncStatus.java
new file mode 100644
index 0000000..a981129
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/util/async/AsyncStatus.java
@@ -0,0 +1,80 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+package zeroecho.pki.util.async;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Non-sensitive status snapshot of an asynchronous operation.
+ *
+ *
+ * This object is safe to expose via APIs and logs (assuming the producer
+ * respects the no-secrets constraint for detail fields).
+ *
+ *
+ * @param state lifecycle state (never {@code null})
+ * @param updatedAt last update time (never {@code null})
+ * @param detailCode optional stable, non-sensitive code suitable for audit and
+ * transport
+ * @param details optional key/value details; values MUST NOT contain secrets
+ */
+public record AsyncStatus(AsyncState state, Instant updatedAt, Optional detailCode,
+ Map details) {
+
+ /**
+ * Creates a status record and validates invariants.
+ *
+ * @throws IllegalArgumentException if any mandatory field is null
+ */
+ public AsyncStatus {
+ Objects.requireNonNull(state, "state");
+ Objects.requireNonNull(updatedAt, "updatedAt");
+ Objects.requireNonNull(detailCode, "detailCode");
+ Objects.requireNonNull(details, "details");
+ }
+
+ /**
+ * Returns whether this status is terminal.
+ *
+ * @return true if terminal
+ */
+ public boolean isTerminal() {
+ return state == AsyncState.SUCCEEDED || state == AsyncState.FAILED || state == AsyncState.CANCELLED
+ || state == AsyncState.EXPIRED;
+ }
+}
diff --git a/pki/src/main/java/zeroecho/pki/util/async/codec/IdCodec.java b/pki/src/main/java/zeroecho/pki/util/async/codec/IdCodec.java
new file mode 100644
index 0000000..6d2674b
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/util/async/codec/IdCodec.java
@@ -0,0 +1,63 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+package zeroecho.pki.util.async.codec;
+
+/**
+ * String codec for stable persistence of identifiers.
+ *
+ *
+ * The encoded form MUST NOT embed secrets. It is intended for identifiers only.
+ *
+ *
+ * @param identifier type
+ */
+public interface IdCodec {
+
+ /**
+ * Encodes a value as a stable, filesystem-safe string token.
+ *
+ * @param value value (never {@code null})
+ * @return encoded token (never blank)
+ */
+ String encode(T value);
+
+ /**
+ * Decodes a previously encoded token.
+ *
+ * @param token token (never blank)
+ * @return decoded value (never {@code null})
+ */
+ T decode(String token);
+}
diff --git a/pki/src/main/java/zeroecho/pki/util/async/codec/ResultCodec.java b/pki/src/main/java/zeroecho/pki/util/async/codec/ResultCodec.java
new file mode 100644
index 0000000..196f2c8
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/util/async/codec/ResultCodec.java
@@ -0,0 +1,114 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+package zeroecho.pki.util.async.codec;
+
+import java.util.Optional;
+
+/**
+ * Optional persistence codec for results.
+ *
+ *
+ * If results are sensitive or large, do not persist them in the bus log.
+ * Instead, store results in a dedicated storage layer and persist only
+ * references.
+ *
+ *
+ * @param result type
+ */
+public interface ResultCodec {
+
+ /**
+ * Encodes a result to a single-line string.
+ *
+ * @param result result (never {@code null})
+ * @return encoded token (never blank)
+ */
+ String encode(R result);
+
+ /**
+ * Decodes a previously encoded result token.
+ *
+ * @param token token (never blank)
+ * @return decoded result
+ */
+ R decode(String token);
+
+ /**
+ * Indicates whether this codec persists results in the bus log.
+ *
+ * @return true if results are persisted; false if results are not persisted
+ */
+ default boolean persistsResults() {
+ return true;
+ }
+
+ /**
+ * Convenience: a codec that never persists any result.
+ *
+ * @param result type
+ * @return codec that rejects encode/decode and signals non-persistence
+ */
+ static ResultCodec none() {
+ return new ResultCodec<>() {
+ @Override
+ public String encode(R result) {
+ throw new UnsupportedOperationException("Result persistence disabled.");
+ }
+
+ @Override
+ public R decode(String token) {
+ throw new UnsupportedOperationException("Result persistence disabled.");
+ }
+
+ @Override
+ public boolean persistsResults() {
+ return false;
+ }
+ };
+ }
+
+ /**
+ * Convenience: optional decode wrapper.
+ *
+ * @param token token
+ * @return optional decoded result (empty if token is empty)
+ */
+ default Optional decodeOptional(String token) {
+ if (token == null || token.isBlank()) {
+ return Optional.empty();
+ }
+ return Optional.of(decode(token));
+ }
+}
diff --git a/pki/src/main/java/zeroecho/pki/util/async/codec/package-info.java b/pki/src/main/java/zeroecho/pki/util/async/codec/package-info.java
new file mode 100644
index 0000000..a388df9
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/util/async/codec/package-info.java
@@ -0,0 +1,53 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+/**
+ * Codecs for persisting identifiers and (optionally) results of async
+ * operations.
+ *
+ *
+ * The async bus persists identifiers via
+ * {@link zeroecho.pki.util.async.codec.IdCodec}. Result persistence is optional
+ * and may be disabled using
+ * {@link zeroecho.pki.util.async.codec.ResultCodec#none()}.
+ *
+ *
+ *
Security
+ *
+ * Encoded forms must not contain secrets. If a subsystem needs to persist
+ * sensitive results, it must do so in a dedicated encrypted store and persist
+ * only a reference identifier in the bus.
+ *
+ */
+package zeroecho.pki.util.async.codec;
diff --git a/pki/src/main/java/zeroecho/pki/util/async/impl/AppendOnlyLineStore.java b/pki/src/main/java/zeroecho/pki/util/async/impl/AppendOnlyLineStore.java
new file mode 100644
index 0000000..513bfd1
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/util/async/impl/AppendOnlyLineStore.java
@@ -0,0 +1,179 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+package zeroecho.pki.util.async.impl;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.OpenOption;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Append-only UTF-8 line store with best-effort POSIX permissions.
+ *
+ *
+ * This is a minimal durability helper for the async bus. It does not interpret
+ * content; it only appends and replays lines.
+ *
+ */
+public final class AppendOnlyLineStore {
+
+ private static final Logger LOG = Logger.getLogger(AppendOnlyLineStore.class.getName());
+
+ private final Path file;
+
+ /**
+ * Creates a store backed by the provided file path.
+ *
+ * @param file store file (never {@code null})
+ */
+ public AppendOnlyLineStore(Path file) {
+ Objects.requireNonNull(file, "file");
+ this.file = file;
+ ensureSecureFile(file);
+ }
+
+ /**
+ * Appends a single line (with trailing '\n') to the store.
+ *
+ * @param line line (never {@code null}, may be empty but not null)
+ */
+ public void appendLine(String line) {
+ Objects.requireNonNull(line, "line");
+
+ OpenOption[] opts = { StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND };
+
+ try (BufferedWriter w = Files.newBufferedWriter(file, StandardCharsets.UTF_8, opts)) {
+ w.write(line);
+ w.write('\n');
+ } catch (IOException ex) {
+ if (LOG.isLoggable(Level.SEVERE)) {
+ LOG.log(Level.SEVERE, "Failed to append async store line to file: " + file.getFileName(), ex);
+ }
+ throw new IllegalStateException("Failed to append async store line.", ex);
+ }
+ }
+
+ /**
+ * Reads all lines from the store.
+ *
+ * @return list of lines (never {@code null})
+ */
+ public List readAllLines() {
+ if (!Files.exists(file)) {
+ return List.of();
+ }
+ try (BufferedReader r = Files.newBufferedReader(file, StandardCharsets.UTF_8)) {
+ return r.lines().toList();
+ } catch (IOException ex) {
+ if (LOG.isLoggable(Level.SEVERE)) {
+ LOG.log(Level.SEVERE, "Failed to read async store file: " + file.getFileName(), ex);
+ }
+ throw new IllegalStateException("Failed to read async store file.", ex);
+ }
+ }
+
+ private static void ensureSecureFile(Path file) {
+ Path dir = file.getParent();
+ if (dir != null) {
+ ensureSecureDirectory(dir);
+ }
+
+ if (!Files.exists(file)) {
+ try {
+ Files.createFile(file);
+ } catch (FileAlreadyExistsException ex) {
+ // ignore
+ LOG.log(Level.FINE, "File {0} already exists, using it", file);
+ } catch (IOException ex) {
+ throw new IllegalStateException("Failed to create async store file: " + file, ex);
+ }
+ }
+
+ applyFilePermissions(file);
+ }
+
+ private static void ensureSecureDirectory(Path dir) {
+ try {
+ Files.createDirectories(dir);
+ } catch (IOException ex) {
+ throw new IllegalStateException("Failed to create async store directory: " + dir, ex);
+ }
+ applyDirectoryPermissions(dir);
+ }
+
+ private static void applyDirectoryPermissions(Path dir) {
+ if (!supportsPosix(dir)) {
+ return;
+ }
+ Set perms = EnumSet.of(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE,
+ PosixFilePermission.OWNER_EXECUTE);
+ try {
+ Files.setPosixFilePermissions(dir, perms);
+ } catch (IOException ex) {
+ // best-effort, do not fail startup
+ LOG.log(Level.WARNING, "Failed to set POSIX permissions on async directory.", ex);
+ }
+ }
+
+ private static void applyFilePermissions(Path file) {
+ if (!supportsPosix(file)) {
+ return;
+ }
+ Set perms = EnumSet.of(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE);
+ try {
+ Files.setPosixFilePermissions(file, perms);
+ } catch (IOException ex) {
+ // best-effort, do not fail startup
+ LOG.log(Level.WARNING, "Failed to set POSIX permissions on async file.", ex);
+ }
+ }
+
+ private static boolean supportsPosix(Path path) {
+ return FileSystems.getDefault().supportedFileAttributeViews().contains("posix") && path != null;
+ }
+}
diff --git a/pki/src/main/java/zeroecho/pki/util/async/impl/DurableAsyncBus.java b/pki/src/main/java/zeroecho/pki/util/async/impl/DurableAsyncBus.java
new file mode 100644
index 0000000..5d28c04
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/util/async/impl/DurableAsyncBus.java
@@ -0,0 +1,559 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+package zeroecho.pki.util.async.impl;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import zeroecho.pki.util.async.AsyncBus;
+import zeroecho.pki.util.async.AsyncEndpoint;
+import zeroecho.pki.util.async.AsyncEvent;
+import zeroecho.pki.util.async.AsyncHandler;
+import zeroecho.pki.util.async.AsyncOperationSnapshot;
+import zeroecho.pki.util.async.AsyncRegistration;
+import zeroecho.pki.util.async.AsyncState;
+import zeroecho.pki.util.async.AsyncStatus;
+import zeroecho.pki.util.async.codec.IdCodec;
+import zeroecho.pki.util.async.codec.ResultCodec;
+
+/**
+ * Generic durable async bus implementation: in-memory state + append-only line
+ * log for recovery.
+ *
+ *
+ * Log format is internal and versioned by a leading token. Corrupted lines are
+ * ignored with a warning (without logging line content).
+ *
+ *
+ *
No-secret logging
+ *
+ * This implementation logs only operation ids (encoded) and types, never
+ * payloads or serialized results.
+ *
+ *
+ * @param operation id type
+ * @param owner type
+ * @param endpoint id type
+ * @param result type
+ */
+public final class DurableAsyncBus // NOPMD
+ implements AsyncBus {
+
+ private static final Logger LOG = Logger.getLogger(DurableAsyncBus.class.getName());
+
+ private static final String REC_SNAPSHOT = "S1";
+ private static final String REC_STATUS = "T1";
+ private static final String REC_RESULT = "R1";
+
+ private final IdCodec opIdCodec;
+ private final IdCodec ownerCodec;
+ private final IdCodec endpointCodec;
+ private final ResultCodec resultCodec;
+ private final AppendOnlyLineStore store;
+
+ private final Map> active;
+ private final Map lastStatus;
+ private final Map results; // removed on consume
+ private final Map> endpointsById;
+ private final List> handlers;
+
+ /**
+ * Creates the bus and immediately replays persisted state.
+ *
+ * @param opIdCodec codec for operation ids (never {@code null})
+ * @param ownerCodec codec for owners (never {@code null})
+ * @param endpointCodec codec for endpoints (never {@code null})
+ * @param resultCodec codec for results (never {@code null})
+ * @param store append-only store (never {@code null})
+ */
+ public DurableAsyncBus(IdCodec opIdCodec, IdCodec ownerCodec, IdCodec endpointCodec,
+ ResultCodec resultCodec, AppendOnlyLineStore store) {
+ this.opIdCodec = Objects.requireNonNull(opIdCodec, "opIdCodec");
+ this.ownerCodec = Objects.requireNonNull(ownerCodec, "ownerCodec");
+ this.endpointCodec = Objects.requireNonNull(endpointCodec, "endpointCodec");
+ this.resultCodec = Objects.requireNonNull(resultCodec, "resultCodec");
+ this.store = Objects.requireNonNull(store, "store");
+
+ this.active = Collections.synchronizedMap(new LinkedHashMap<>());
+ this.lastStatus = Collections.synchronizedMap(new HashMap<>());
+ this.results = Collections.synchronizedMap(new HashMap<>());
+ this.endpointsById = Collections.synchronizedMap(new HashMap<>());
+ this.handlers = Collections.synchronizedList(new ArrayList<>());
+
+ replay();
+ }
+
+ @Override
+ public void registerEndpoint(EndpointId endpointId, AsyncEndpoint endpoint) {
+ Objects.requireNonNull(endpointId, "endpointId");
+ Objects.requireNonNull(endpoint, "endpoint");
+ endpointsById.put(endpointId, endpoint);
+
+ if (LOG.isLoggable(Level.INFO)) {
+ LOG.log(Level.INFO, "Registered async endpoint id={0}", new Object[] { safeEndpoint(endpointId) });
+ }
+ }
+
+ @Override
+ public AsyncRegistration subscribe(AsyncHandler handler) {
+ Objects.requireNonNull(handler, "handler");
+ handlers.add(handler);
+ return new AsyncRegistration() {
+ private volatile boolean closed; // NOPMD
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ handlers.remove(handler);
+ }
+ };
+ }
+
+ @Override
+ public AsyncOperationSnapshot submit(OpId opId, String type, Owner owner,
+ EndpointId endpointId, Instant createdAt, Duration ttl) {
+
+ Objects.requireNonNull(opId, "opId");
+ Objects.requireNonNull(type, "type");
+ Objects.requireNonNull(owner, "owner");
+ Objects.requireNonNull(endpointId, "endpointId");
+ Objects.requireNonNull(createdAt, "createdAt");
+ Objects.requireNonNull(ttl, "ttl");
+ if (type.isBlank()) {
+ throw new IllegalArgumentException("type must not be blank");
+ }
+ if (ttl.isZero() || ttl.isNegative()) {
+ throw new IllegalArgumentException("ttl must be positive");
+ }
+
+ Instant expiresAt = createdAt.plus(ttl);
+ AsyncOperationSnapshot snap = new AsyncOperationSnapshot<>(opId, type, owner,
+ endpointId, createdAt, expiresAt);
+
+ active.put(opId, snap);
+
+ AsyncStatus init = new AsyncStatus(AsyncState.SUBMITTED, createdAt, Optional.of("SUBMITTED"),
+ Map.of("type", type));
+ lastStatus.put(opId, init);
+
+ store.appendLine(encodeSnapshotLine(snap));
+ store.appendLine(encodeStatusLine(opId, init));
+
+ dispatchEvent(opId, snap, init, Optional.empty());
+
+ if (LOG.isLoggable(Level.INFO)) {
+ LOG.log(Level.INFO, "Submitted async opId={0} type={1} endpoint={2}",
+ new Object[] { safeOpId(opId), type, safeEndpoint(endpointId) });
+ }
+
+ return snap;
+ }
+
+ @Override
+ public void update(OpId opId, AsyncStatus status, Optional result) {
+ Objects.requireNonNull(opId, "opId");
+ Objects.requireNonNull(status, "status");
+ Objects.requireNonNull(result, "result");
+
+ AsyncOperationSnapshot snap = active.get(opId);
+ if (snap == null) {
+ // unknown op; ignore (or strict throw) - choose ignore to allow idempotent
+ // updates after cleanup
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "Ignoring update for unknown async opId={0}", new Object[] { safeOpId(opId) });
+ }
+ return;
+ }
+
+ lastStatus.put(opId, status);
+ store.appendLine(encodeStatusLine(opId, status));
+
+ if (result.isPresent()) {
+ if (resultCodec.persistsResults()) {
+ store.appendLine(encodeResultLine(opId, result.get()));
+ }
+ results.put(opId, result.get());
+ }
+
+ dispatchEvent(opId, snap, status, result);
+
+ if (status.isTerminal()) {
+ // keep snapshot+status until result consumed, but allow cleanup if no result
+ // exists
+ if (status.state() != AsyncState.SUCCEEDED) { // NOPMD
+ deleteOperation(opId);
+ }
+ }
+ }
+
+ @Override
+ public Optional status(OpId opId) {
+ Objects.requireNonNull(opId, "opId");
+ return Optional.ofNullable(lastStatus.get(opId));
+ }
+
+ @Override
+ public Optional> snapshot(OpId opId) {
+ Objects.requireNonNull(opId, "opId");
+ return Optional.ofNullable(active.get(opId));
+ }
+
+ @Override
+ public Optional consumeResult(OpId opId) {
+ Objects.requireNonNull(opId, "opId");
+
+ Result r = results.remove(opId);
+ if (r == null) {
+ return Optional.empty();
+ }
+
+ // on successful consumption, forget operation entirely (bounded storage)
+ deleteOperation(opId);
+
+ if (LOG.isLoggable(Level.INFO)) {
+ LOG.log(Level.INFO, "Consumed async result opId={0}", new Object[] { safeOpId(opId) });
+ }
+
+ return Optional.of(r);
+ }
+
+ @Override
+ public void sweep(Instant now) {
+ Objects.requireNonNull(now, "now");
+
+ // 1) expire
+ sweepExpired(now);
+
+ // 2) poll open ops (best-effort)
+ List> open = List.copyOf(active.values());
+ for (AsyncOperationSnapshot snap : open) {
+ OpId opId = snap.opId();
+ AsyncStatus st = lastStatus.get(opId);
+ if (st != null && st.isTerminal()) {
+ continue;
+ }
+
+ AsyncEndpoint endpoint = endpointsById.get(snap.endpointId());
+ if (endpoint == null) {
+ continue;
+ }
+
+ Optional statusOpt;
+ try {
+ statusOpt = endpoint.status(opId);
+ } catch (RuntimeException ex) { // NOPMD
+ // endpoint misbehaved; do not fail sweep
+ if (LOG.isLoggable(Level.WARNING)) {
+ LOG.log(Level.WARNING, "Async endpoint status() failed; opId=" + safeOpId(opId), ex);
+ }
+ continue;
+ }
+
+ if (statusOpt.isEmpty()) {
+ continue;
+ }
+
+ AsyncStatus polled = statusOpt.get();
+ AsyncStatus prev = lastStatus.get(opId);
+ if (prev != null && prev.updatedAt().equals(polled.updatedAt()) && prev.state() == polled.state()) {
+ continue;
+ }
+
+ Optional res = Optional.empty();
+ if (polled.state() == AsyncState.SUCCEEDED) {
+ try {
+ res = endpoint.result(opId);
+ } catch (RuntimeException ex) { // NOPMD
+ if (LOG.isLoggable(Level.WARNING)) {
+ LOG.log(Level.WARNING, "Async endpoint result() failed; opId=" + safeOpId(opId), ex);
+ }
+ }
+ }
+
+ update(opId, polled, res);
+ }
+ }
+
+ private void sweepExpired(Instant now) {
+ List>> entries = List.copyOf(active.entrySet());
+ for (Map.Entry> e : entries) {
+ OpId opId = e.getKey();
+ AsyncOperationSnapshot snap = e.getValue();
+
+ if (!now.isAfter(snap.expiresAt())) {
+ continue;
+ }
+
+ AsyncStatus expired = new AsyncStatus(AsyncState.EXPIRED, now, Optional.of("EXPIRED"), // NOPMD
+ Map.of("reason", "deadline-exceeded"));
+
+ // prev is intentionally not used: we keep it to allow future diagnostics
+ // (e.g., breakpoint inspection). If you prefer strictness, remove the
+ // assignment.
+ @SuppressWarnings("unused")
+ AsyncStatus prev = lastStatus.put(opId, expired);
+
+ store.appendLine(encodeStatusLine(opId, expired));
+
+ if (LOG.isLoggable(Level.INFO)) {
+ LOG.log(Level.INFO, "Expired async operation opId={0} type={1}",
+ new Object[] { safeOpId(opId), snap.type() }); // NOPMD
+ }
+
+ dispatchEvent(opId, snap, expired, Optional.empty());
+
+ deleteOperation(opId);
+ }
+ }
+
+ private void deleteOperation(OpId opId) {
+ active.remove(opId);
+ lastStatus.remove(opId);
+ results.remove(opId);
+ }
+
+ private void dispatchEvent(OpId opId, AsyncOperationSnapshot snap, AsyncStatus status,
+ Optional result) {
+
+ AsyncEvent event = new AsyncEvent<>(snap, status, result);
+
+ List> copy = List.copyOf(handlers);
+ for (AsyncHandler h : copy) {
+ try {
+ h.onEvent(event);
+ } catch (RuntimeException ex) { // NOPMD
+ if (LOG.isLoggable(Level.WARNING)) {
+ LOG.log(Level.WARNING, "Async handler failed; opId=" + safeOpId(opId), ex);
+ }
+ }
+ }
+ }
+
+ private void replay() {
+ List lines = store.readAllLines();
+ if (lines.isEmpty()) {
+ return;
+ }
+
+ int applied = 0;
+ for (String line : lines) {
+ if (line == null || line.isBlank()) {
+ continue;
+ }
+ try {
+ if (line.startsWith(REC_SNAPSHOT + "|")) {
+ applySnapshotLine(line);
+ applied++;
+ } else if (line.startsWith(REC_STATUS + "|")) {
+ applyStatusLine(line);
+ applied++;
+ } else if (line.startsWith(REC_RESULT + "|")) {
+ applyResultLine(line);
+ applied++;
+ }
+ } catch (RuntimeException ex) { // NOPMD
+ // corrupted line: ignore safely without logging contents
+ LOG.log(Level.WARNING, "Corrupted async store line encountered; skipping.", ex);
+ }
+ }
+
+ if (LOG.isLoggable(Level.INFO)) {
+ LOG.log(Level.INFO, "Replayed async store lines applied={0} activeOps={1}",
+ new Object[] { applied, active.size() });
+ }
+ }
+
+ private void applySnapshotLine(String line) {
+ String[] parts = split(line, 7);
+ // S1|opId|type|owner|endpoint|createdAt|expiresAt
+ OpId opId = opIdCodec.decode(parts[1]);
+ String type = parts[2];
+ Owner owner = ownerCodec.decode(parts[3]);
+ EndpointId endpointId = endpointCodec.decode(parts[4]);
+ Instant createdAt = Instant.parse(parts[5]);
+ Instant expiresAt = Instant.parse(parts[6]);
+
+ AsyncOperationSnapshot snap = new AsyncOperationSnapshot<>(opId, type, owner,
+ endpointId, createdAt, expiresAt);
+ active.put(opId, snap);
+ }
+
+ private void applyStatusLine(String line) {
+ String[] parts = split(line, 6);
+ // T1|opId|state|updatedAt|detailCode|k=v&k=v
+ OpId opId = opIdCodec.decode(parts[1]);
+ AsyncState state = AsyncState.valueOf(parts[2]);
+ Instant updatedAt = Instant.parse(parts[3]);
+ Optional dc = parts[4].isBlank() ? Optional.empty() : Optional.of(parts[4]);
+ Map details = decodeDetails(parts[5]);
+
+ AsyncStatus st = new AsyncStatus(state, updatedAt, dc, details);
+ lastStatus.put(opId, st);
+ }
+
+ private void applyResultLine(String line) {
+ if (!resultCodec.persistsResults()) {
+ return;
+ }
+ String[] parts = split(line, 3);
+ // R1|opId|resultToken
+ OpId opId = opIdCodec.decode(parts[1]);
+ Result r = resultCodec.decode(parts[2]);
+ results.put(opId, r);
+ }
+
+ private String encodeSnapshotLine(AsyncOperationSnapshot snap) {
+ return REC_SNAPSHOT + "|" + opIdCodec.encode(snap.opId()) + "|" + snap.type() + "|"
+ + ownerCodec.encode(snap.owner()) + "|" + endpointCodec.encode(snap.endpointId()) + "|"
+ + snap.createdAt().toString() + "|" + snap.expiresAt().toString();
+ }
+
+ private String encodeStatusLine(OpId opId, AsyncStatus st) {
+ String dc = st.detailCode().orElse("");
+ String details = encodeDetails(st.details());
+ return REC_STATUS + "|" + opIdCodec.encode(opId) + "|" + st.state().name() + "|" + st.updatedAt().toString()
+ + "|" + dc + "|" + details;
+ }
+
+ private String encodeResultLine(OpId opId, Result r) {
+ return REC_RESULT + "|" + opIdCodec.encode(opId) + "|" + resultCodec.encode(r);
+ }
+
+ private static String encodeDetails(Map details) {
+ if (details == null || details.isEmpty()) {
+ return "";
+ }
+ StringBuilder sb = new StringBuilder(64);
+ boolean first = true;
+ for (Map.Entry e : details.entrySet()) {
+ if (!first) {
+ sb.append('&');
+ }
+ first = false;
+ sb.append(escape(e.getKey())).append('=').append(escape(e.getValue()));
+ }
+ return sb.toString();
+ }
+
+ private static Map decodeDetails(String token) {
+ if (token == null || token.isBlank()) {
+ return Map.of();
+ }
+ Map out = new LinkedHashMap<>();
+ String[] pairs = token.split("&");
+ for (String p : pairs) {
+ int idx = p.indexOf('=');
+ if (idx <= 0) {
+ continue;
+ }
+ String k = unescape(p.substring(0, idx));
+ String v = unescape(p.substring(idx + 1));
+ out.put(k, v);
+ }
+ return Map.copyOf(out);
+ }
+
+ private static String escape(String s) {
+ if (s == null) {
+ return "";
+ }
+ return s.replace("\\", "\\\\").replace("&", "\\&").replace("=", "\\=");
+ }
+
+ private static String unescape(String s) {
+ if (s == null || s.isEmpty()) {
+ return "";
+ }
+ StringBuilder sb = new StringBuilder(s.length());
+ boolean esc = false;
+ for (int i = 0; i < s.length(); i++) {
+ char c = s.charAt(i);
+ if (esc) {
+ sb.append(c);
+ esc = false;
+ } else if (c == '\\') { // NOPMD
+ esc = true;
+ } else {
+ sb.append(c);
+ }
+ }
+ return sb.toString();
+ }
+
+ private static String[] split(String line, int expected) {
+ String[] parts = line.split("\\|", -1);
+ if (parts.length != expected) {
+ throw new IllegalArgumentException("Invalid record arity.");
+ }
+ return parts;
+ }
+
+ private String safeOpId(OpId opId) {
+ try {
+ String s = opIdCodec.encode(opId);
+ if (s.length() > 30) { // NOPMD
+ return s.substring(0, 30) + "...";
+ }
+ return s;
+ } catch (RuntimeException ex) { // NOPMD
+ return "";
+ }
+ }
+
+ private String safeEndpoint(EndpointId endpointId) {
+ try {
+ String s = endpointCodec.encode(endpointId);
+ if (s.length() > 30) { // NOPMD
+ return s.substring(0, 30) + "...";
+ }
+ return s;
+ } catch (RuntimeException ex) { // NOPMD
+ return "";
+ }
+ }
+}
diff --git a/pki/src/main/java/zeroecho/pki/util/async/impl/package-info.java b/pki/src/main/java/zeroecho/pki/util/async/impl/package-info.java
new file mode 100644
index 0000000..0984886
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/util/async/impl/package-info.java
@@ -0,0 +1,58 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+/**
+ * Reference implementations for the generic async bus.
+ *
+ *
+ * This package contains durability and recovery helpers (append-only line
+ * store) and a generic durable bus implementation.
+ *
+ *
+ *
Operational model
+ *
+ *
State is held in memory for speed.
+ *
Transitions are appended to an on-disk log for restart recovery.
+ *
Restart recovery replays the log and then relies on {@code sweep(...)} to
+ * synchronize open operations with their endpoints.
+ *
+ *
+ *
Security
+ *
+ * Implementations must never log secrets, payloads, private keys, or internal
+ * cryptographic primitive state. Persisted content must contain only
+ * identifiers and non-sensitive status metadata.
+ *
+ */
+package zeroecho.pki.util.async.impl;
diff --git a/pki/src/main/java/zeroecho/pki/util/async/package-info.java b/pki/src/main/java/zeroecho/pki/util/async/package-info.java
new file mode 100644
index 0000000..e4f7740
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/util/async/package-info.java
@@ -0,0 +1,70 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+/**
+ * Generic asynchronous operation bus with durable tracking and callback
+ * dispatch.
+ *
+ *
+ * This package provides a reusable, strongly-typed abstraction for submitting
+ * asynchronous operations, tracking their status, and delivering status changes
+ * to registered callback handlers. It is intentionally generic: concrete
+ * subsystems (e.g., PKI issuance, publishing, signing workflows) instantiate
+ * the bus by selecting:
+ *
+ *
+ *
+ *
operation identifier type (e.g., {@code PkiId}),
+ *
operation owner type (e.g., {@code Principal}),
+ *
endpoint identifier type (e.g., {@code String}),
+ *
result type (application-specific, may be empty).
+ *
+ *
+ *
Durability model
+ *
+ * The default reference implementation is append-only: it persists operation
+ * snapshots and status transitions to a file log. On startup it replays the log
+ * and then optionally re-synchronizes open operations by asking the endpoint
+ * for their current status.
+ *
+ *
+ *
Security
+ *
+ *
No secrets must be logged.
+ *
Persistence encodings should avoid plaintext secrets. If sensitive result
+ * data exists, store it encrypted outside of this package or provide a redacted
+ * {@code ResultCodec}.
+ *
+ */
+package zeroecho.pki.util.async;
diff --git a/pki/src/main/java/zeroecho/pki/util/package-info.java b/pki/src/main/java/zeroecho/pki/util/package-info.java
new file mode 100644
index 0000000..22d4dd3
--- /dev/null
+++ b/pki/src/main/java/zeroecho/pki/util/package-info.java
@@ -0,0 +1,97 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+/**
+ * Utility types and helper functions for ZeroEcho PKI.
+ *
+ *
+ * This package contains small, focused utility components that support the PKI
+ * domain model and its surrounding infrastructure without introducing
+ * additional domain semantics. Utilities provided here are intended to be:
+ *
+ *
+ *
deterministic and side-effect free,
+ *
safe to use across different runtime environments,
+ *
independent of persistence, networking, or cryptographic providers.
+ *
+ *
+ *
Package structure
+ *
+ * The package is intentionally shallow at its top level and consists of:
+ *
+ *
+ *
A single direct utility class: {@link zeroecho.pki.util.PkiIds PkiIds},
+ * which provides helper methods for creation and handling of PKI identifiers in
+ * a consistent and deterministic manner.
+ *
Several subordinate utility subpackages, each grouping helpers for a
+ * specific technical concern (e.g. formatting, validation, or internal
+ * conventions), without polluting the top-level namespace.
+ *
+ *
+ *
Design principles
+ *
+ * Utilities in this package follow strict design rules:
+ *
+ *
+ *
no global mutable state,
+ *
no hidden caching with externally visible effects,
+ *
no dependency on environment-specific configuration,
+ *
no leakage of sensitive material (keys, secrets, plaintext).
+ *
+ *
+ *
+ * Where randomness or uniqueness is required (e.g. identifier generation), the
+ * semantics are explicitly documented and predictable within the guarantees of
+ * the underlying platform.
+ *
+ *
+ *
Relation to other layers
+ *
+ * This package is deliberately orthogonal to:
+ *
+ *
+ *
API domain packages (such as {@code zeroecho.pki.api.*}),
+ *
SPI and implementation layers,
+ *
application bootstrap and orchestration code.
+ *
+ *
+ *
+ * As such, utilities defined here may be freely reused by API, SPI, and
+ * implementation code without creating circular dependencies or architectural
+ * coupling.
+ *
- * Tests focus on filesystem semantics (write-once, history, snapshot export)
- * and avoid dependencies on optional domain factories. Where the API uses
- * interfaces (notably {@link AttributeSet}), tests provide a minimal
- * deterministic stub.
+ * The tests are deterministic and do not use reflection. Snapshot export
+ * semantics are tested using {@link FsPkiStoreOptions#strictSnapshotExport()}
+ * configured appropriately.
*
*
*
- * Every test routine prints its own name and prints {@code ...ok} on success.
- * Important intermediate values are printed with {@code "..."} prefix.
+ * Output conventions:
*
+ *
+ *
Each test prints its own name at the start.
+ *
Each test prints {@code ...ok} on success.
+ *
Important intermediate state is printed with {@code ...} prefix.
- * The production API defines {@link AttributeSet} as a passive interface
- * without factories. Tests only require an immutable empty set.
- *
+ * @param entries serialized entries
*/
- static final class EmptyAttributeSet implements AttributeSet {
+ public record TestAttributeSet(List entries) implements AttributeSet {
- @Override
- public Set ids() {
- return Set.of();
+ public record Entry(AttributeId id, List values) {
+
+ public Entry {
+ if (id == null) {
+ throw new IllegalArgumentException("id must not be null");
+ }
+ if (values == null) {
+ throw new IllegalArgumentException("values must not be null");
+ }
+ }
}
@Override
- public Optional get(final AttributeId id) {
- Objects.requireNonNull(id, "id");
+ public Set ids() {
+ return entries.stream().map(Entry::id).collect(Collectors.toUnmodifiableSet());
+ }
+
+ @Override
+ public Optional get(AttributeId id) {
+ if (id == null) {
+ throw new IllegalArgumentException("id must not be null");
+ }
+ for (Entry e : entries) {
+ if (e.id().equals(id)) {
+ if (e.values().isEmpty()) {
+ return Optional.empty();
+ }
+ return Optional.of(e.values().get(0));
+ }
+ }
return Optional.empty();
}
@Override
- public List getAll(final AttributeId id) {
- Objects.requireNonNull(id, "id");
+ public List getAll(AttributeId id) {
+ if (id == null) {
+ throw new IllegalArgumentException("id must not be null");
+ }
+ for (Entry e : entries) {
+ if (e.id().equals(id)) {
+ return List.copyOf(e.values());
+ }
+ }
return List.of();
}
}
+
+ /**
+ * Deterministic domain fixtures.
+ */
+ static final class TestObjects {
+
+ private static final AtomicLong SEQ = new AtomicLong(1L);
+
+ static CaRecord minimalCaRecord(String caId, CaState state) {
+ PkiId id = new PkiId(caId);
+
+ KeyRef issuerKeyRef = new KeyRef("issuer-key-" + caId);
+ SubjectRef subjectRef = new SubjectRef("CN=" + caId);
+
+ Credential cred = minimalCredential("CA-" + caId, "profile-ca");
+ List caCredentials = List.of(cred);
+
+ return new CaRecord(id, CaKind.ROOT, state, issuerKeyRef, subjectRef, caCredentials);
+ }
+
+ static CertificateProfile minimalProfile(String profileId, boolean active) {
+ FormatId formatId = new FormatId("fmt-x509");
+ String displayName = "Profile " + profileId;
+
+ List required = List.of(new AttributeId("req-1"));
+ List optional = List.of(new AttributeId("opt-1"));
+
+ Optional maxValidity = Optional.of(Duration.ofDays(365));
+
+ return new CertificateProfile(profileId, formatId, displayName, required, optional, maxValidity, active);
+ }
+
+ static Credential minimalCredential(String serial, String profileId) {
+ PkiId credentialId = new PkiId("cred-" + nextSeq());
+ FormatId formatId = new FormatId("fmt-x509");
+
+ IssuerRef issuerRef = new IssuerRef(new PkiId("issuer-" + nextSeq()));
+ SubjectRef subjectRef = new SubjectRef("CN=subj-" + nextSeq());
+
+ Instant notBefore = Instant.EPOCH;
+ Instant notAfter = Instant.EPOCH.plusSeconds(60L);
+ Validity validity = new Validity(notBefore, notAfter);
+
+ PkiId publicKeyId = new PkiId("pk-" + nextSeq());
+
+ CredentialStatus status = CredentialStatus.ISSUED;
+
+ EncodedObject encoded = minimalEncodedObject();
+ AttributeSet attrs = emptyAttributes();
+
+ return new Credential(credentialId, formatId, issuerRef, subjectRef, validity, serial, publicKeyId,
+ profileId, status, encoded, attrs);
+ }
+
+ static RevokedRecord minimalRevocation(String credentialId, Instant when, RevocationReason reason) {
+ PkiId id = new PkiId(credentialId);
+ AttributeSet attrs = emptyAttributes();
+ return new RevokedRecord(id, when, reason, attrs);
+ }
+
+ static AttributeSet emptyAttributes() {
+ return new TestAttributeSet(List.of());
+ }
+
+ private static EncodedObject minimalEncodedObject() {
+ return new EncodedObject(Encoding.DER, new byte[] { 0x01, 0x02, 0x03 });
+ }
+
+ private static String nextSeq() {
+ return String.format("%016x", Long.valueOf(SEQ.getAndIncrement()));
+ }
+ }
}
diff --git a/pki/src/test/java/zeroecho/pki/impl/fs/FsCodecTest.java b/pki/src/test/java/zeroecho/pki/impl/fs/FsCodecTest.java
new file mode 100644
index 0000000..c8cc5b9
--- /dev/null
+++ b/pki/src/test/java/zeroecho/pki/impl/fs/FsCodecTest.java
@@ -0,0 +1,112 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+package zeroecho.pki.impl.fs;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+
+import org.junit.jupiter.api.Test;
+
+import zeroecho.pki.api.FormatId;
+import zeroecho.pki.api.attr.AttributeId;
+import zeroecho.pki.api.profile.CertificateProfile;
+
+/**
+ * Tests for {@link FsCodec}.
+ */
+public final class FsCodecTest {
+
+ @Test
+ void booleanEncodingIsCompactAndDecodesIntoPrimitiveExpectation() {
+ System.out.println("booleanEncodingIsCompactAndDecodesIntoPrimitiveExpectation");
+
+ byte[] data = FsCodec.encode(Boolean.TRUE);
+
+ // MAGIC + TAG + payload => 3 bytes
+ System.out.println("...encoded length=" + data.length);
+ assertTrue(data.length <= 5);
+
+ // decode as Boolean
+ Boolean b = FsCodec.decode(data, Boolean.class);
+ assertEquals(Boolean.TRUE, b);
+
+ // decode as primitive expectation via Boolean.class cast still OK in callers;
+ // actual primitive/wrapper tolerance is inside the codec.
+ Object any = FsCodec.decode(data, Object.class);
+ assertEquals(Boolean.TRUE, any);
+
+ System.out.println("booleanEncodingIsCompactAndDecodesIntoPrimitiveExpectation...ok");
+ }
+
+ @Test
+ void recordRoundTripUsesCompactEncoding() {
+ System.out.println("recordRoundTripUsesCompactEncoding");
+
+ CertificateProfile p = new CertificateProfile("profile-a", new FormatId("fmt-x509"), "Profile A",
+ List.of(new AttributeId("req-1")), List.of(new AttributeId("opt-1")), Optional.of(Duration.ofDays(365)),
+ true);
+
+ byte[] data = FsCodec.encode(p);
+ System.out.println("...encoded length=" + data.length);
+ assertTrue(data.length > 0);
+
+ CertificateProfile decoded = FsCodec.decode(data, CertificateProfile.class);
+ assertEquals(p, decoded);
+
+ System.out.println("recordRoundTripUsesCompactEncoding...ok");
+ }
+
+ @Test
+ void listAndOptionalRoundTripUsesCompactEncoding() {
+ System.out.println("listAndOptionalRoundTripUsesCompactEncoding");
+
+ List list = List.of("a", "b");
+ byte[] listData = FsCodec.encode(list);
+ Object decodedList = FsCodec.decode(listData, Object.class);
+ assertEquals(list, decodedList);
+
+ Optional opt = Optional.of(Integer.valueOf(7));
+ byte[] optData = FsCodec.encode(opt);
+ Object decodedOpt = FsCodec.decode(optData, Object.class);
+ assertEquals(opt, decodedOpt);
+
+ System.out.println("listAndOptionalRoundTripUsesCompactEncoding...ok");
+ }
+
+}
diff --git a/pki/src/test/java/zeroecho/pki/util/async/DurableAsyncBusTest.java b/pki/src/test/java/zeroecho/pki/util/async/DurableAsyncBusTest.java
new file mode 100644
index 0000000..257dda8
--- /dev/null
+++ b/pki/src/test/java/zeroecho/pki/util/async/DurableAsyncBusTest.java
@@ -0,0 +1,152 @@
+/*******************************************************************************
+ * Copyright (C) 2025, Leo Galambos
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. All advertising materials mentioning features or use of this software must
+ * display the following acknowledgement:
+ * This product includes software developed by the Egothor project.
+ *
+ * 4. Neither the name of the copyright holder nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ ******************************************************************************/
+package zeroecho.pki.util.async;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import zeroecho.pki.api.PkiId;
+import zeroecho.pki.api.audit.Principal;
+import zeroecho.pki.impl.async.PkiCodecs;
+import zeroecho.pki.util.async.codec.ResultCodec;
+import zeroecho.pki.util.async.impl.AppendOnlyLineStore;
+import zeroecho.pki.util.async.impl.DurableAsyncBus;
+
+public class DurableAsyncBusTest {
+
+ @TempDir
+ Path tempDir;
+
+ @Test
+ public void submitUpdateConsume_removesResultAndOperation() {
+ System.out.println("submitUpdateConsume_removesResultAndOperation");
+
+ Path log = tempDir.resolve("async.log");
+ System.out.println("...log=" + log.getFileName());
+
+ AppendOnlyLineStore store = new AppendOnlyLineStore(log);
+
+ DurableAsyncBus bus = new DurableAsyncBus(
+ PkiCodecs.PKI_ID, PkiCodecs.PRINCIPAL, PkiCodecs.STRING, new ResultCodec() {
+ @Override
+ public String encode(String result) {
+ return result;
+ }
+
+ @Override
+ public String decode(String token) {
+ return token;
+ }
+ }, store);
+
+ PkiId opId = new PkiId("op-1");
+ Principal owner = new Principal("USER", "alice");
+ bus.registerEndpoint("endpoint-1", new AsyncEndpoint() {
+ @Override
+ public Optional status(PkiId id) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional result(PkiId id) {
+ return Optional.empty();
+ }
+ });
+
+ bus.submit(opId, "ISSUE_CERT", owner, "endpoint-1", Instant.parse("2025-01-01T00:00:00Z"), Duration.ofHours(1));
+
+ AsyncStatus ok = new AsyncStatus(AsyncState.SUCCEEDED, Instant.parse("2025-01-01T00:00:10Z"), Optional.of("OK"),
+ java.util.Map.of("note", "done"));
+
+ bus.update(opId, ok, Optional.of("RESULT-ABC"));
+
+ Optional consumed = bus.consumeResult(opId);
+ System.out.println("...consumed=" + (consumed.isPresent() ? consumed.get() : ""));
+ assertTrue(consumed.isPresent());
+ assertEquals("RESULT-ABC", consumed.get());
+
+ assertTrue(bus.snapshot(opId).isEmpty());
+ assertTrue(bus.status(opId).isEmpty());
+
+ System.out.println("...ok");
+ }
+
+ @Test
+ public void replay_restoresSnapshotAndStatus() {
+ System.out.println("replay_restoresSnapshotAndStatus");
+
+ Path log = tempDir.resolve("async.log");
+ System.out.println("...log=" + log.getFileName());
+
+ AppendOnlyLineStore store1 = new AppendOnlyLineStore(log);
+
+ DurableAsyncBus bus1 = new DurableAsyncBus(
+ PkiCodecs.PKI_ID, PkiCodecs.PRINCIPAL, PkiCodecs.STRING, ResultCodec.none(), store1);
+
+ PkiId opId = new PkiId("op-2");
+ Principal owner = new Principal("SERVICE", "issuer");
+ bus1.submit(opId, "CRL", owner, "endpoint-1", Instant.parse("2025-01-01T00:00:00Z"), Duration.ofHours(2));
+
+ AsyncStatus running = new AsyncStatus(AsyncState.RUNNING, Instant.parse("2025-01-01T00:00:05Z"),
+ Optional.of("RUNNING"), java.util.Map.of());
+ bus1.update(opId, running, Optional.empty());
+
+ AppendOnlyLineStore store2 = new AppendOnlyLineStore(log);
+
+ DurableAsyncBus bus2 = new DurableAsyncBus(
+ PkiCodecs.PKI_ID, PkiCodecs.PRINCIPAL, PkiCodecs.STRING, ResultCodec.none(), store2);
+
+ Optional> snap = bus2.snapshot(opId);
+ Optional st = bus2.status(opId);
+
+ System.out.println("...snapshotPresent=" + snap.isPresent());
+ System.out.println("...statusPresent=" + st.isPresent());
+
+ assertTrue(snap.isPresent());
+ assertTrue(st.isPresent());
+ assertEquals(AsyncState.RUNNING, st.get().state());
+ assertEquals("CRL", snap.get().type());
+
+ System.out.println("...ok");
+ }
+}