# Moose / Streaming / From Your Code Documentation – Python ## Included Files 1. moose/streaming/from-your-code/from-your-code.mdx ## Publish Data Source: moose/streaming/from-your-code/from-your-code.mdx Write data to streams from applications, APIs, or external sources # Publishing Data to Streams ## Overview Publishing data to streams allows you to write data from various sources into your Kafka/Redpanda topics. This is the first step in building real-time data pipelines. ## Publishing Methods ### Using REST APIs The most common way to publish data is through Moose's built-in ingestion APIs. These are configured to automatically sit in front of your streams and publish data to them whenever a request is made to the endpoint: ```py filename="PublishViaAPI.py" copy from moose_lib import IngestPipeline, IngestPipelineConfig # When you create an IngestPipeline with ingest_api: True, Moose automatically creates an API endpoint raw_data = IngestPipeline[RawData]("raw_data", IngestPipelineConfig( ingest_api=True, # Creates POST /ingest/raw_data endpoint stream=True, table=True )) # You can then publish data via HTTP POST requests response = requests.post('/ingest/raw_data', json={ 'id': '123', 'value': 42 }) ``` See the [OpenAPI documentation](/stack/open-api) to learn more about how to generate type-safe client SDKs in your language of choice for all of your Moose APIs. ### Direct Stream Publishing You can publish directly to a stream from your Moose code using the stream's `send` method. This is useful when emitting events from workflows or other backend logic. `send` accepts a single record or an array of records. If your `Stream` is configured with `schemaConfig.kind = "JSON"`, Moose produces using the Confluent envelope automatically (0x00 + schema id + JSON). No code changes are needed beyond setting `schemaConfig`. See the [Schema Registry guide](/moose/streaming/schema-registry). ```py filename="DirectPublish.py" copy from moose_lib import Stream, StreamConfig, Key from pydantic import BaseModel from datetime import datetime class UserEvent(BaseModel): id: Key[str] user_id: str timestamp: datetime event_type: str # Create a stream (optionally pass StreamConfig with destination/table settings) events = Stream[UserEvent]("user_events", StreamConfig()) # Publish a single record events.send(UserEvent( id="evt_1", user_id="user_123", timestamp=datetime.now(), event_type="click" )) # Publish multiple records events.send([ UserEvent(id="evt_2", user_id="user_456", timestamp=datetime.now(), event_type="view"), UserEvent(id="evt_3", user_id="user_789", timestamp=datetime.now(), event_type="signup"), ]) ``` Moose builds the Kafka topic name from your stream name, optional namespace, and optional version (dots become underscores). For example, a stream named `events` with version `1.2.0` becomes `events_1_2_0` (or `my_ns.events_1_2_0` when the namespace is `"my_ns"`). ### Using the Kafka/Redpanda Client from External Applications You can also publish to streams from external applications using Kafka/Redpanda clients: ```py filename="ExternalPublish.py" copy from kafka import KafkaProducer from datetime import datetime producer = KafkaProducer( bootstrap_servers=['localhost:19092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) # Publish to the stream topic producer.send('user-events', { # Stream name becomes the topic name 'id': 'event-123', 'user_id': 'user-456', 'timestamp': datetime.now().isoformat(), 'event_type': 'page_view' }) ``` #### Locating Redpanda Connection Details When running your Moose backend within your local dev environment, you can find the connection details for your Redpanda cluster in the `moose.config.toml` file in the root of your project: ```toml filename="moose.config.toml" copy [redpanda_config] broker = "localhost:19092" message_timeout_ms = 1000 retention_ms = 30000 replication_factor = 1 ```