Convert MARC Binary to TAMU DCΒΆ

Currently we have no official tooling for converting MARC binary to DublinCore. Until we do, you can use a script like this. If any part of this needs to be changed, it can be.

import csv
import pymarc

INPUT_FILE = "Feb2-Mark-InstanceUUIDs-932.mrc"
OUTPUT_FILE = "woellhof.csv"

DC_HEADERS = [
    "dc.title",
    "dc.identifier",
    "dc.creator",
    "dcterms.extent",
    "dcterms.spatial",
    "dc.coverage",
    "dc.publisher",
    "dcterms.issued",
    "dcterms.type",
    "dc.subject",
    "dc.type",
    "dc.language",
    "dc.rights",
    "dcterms.abstract",
    "dc.description",
    "dcterms.isPartOf",
    "dc.format",
]

LANGUAGE_CODES = {
    "lat": "Latin",
    "eng": "English",
    "ger": "German",
    "fre": "French",
    "dut": "Dutch",
    "ita": "Italian",
    "spa": "Spanish",
    "por": "Portuguese",
    "ara": "Arabic",
    "gre": "Greek",
    "grc": "Greek, Ancient",
    "tur": "Turkish",
}


def get_field(record, tag):
    """Safely get a field, returning None if not present."""
    fields = record.get_fields(tag)
    return fields[0] if fields else None


def get_subfields(field, codes):
    """Get concatenated subfield values for given codes."""
    parts = []
    for code in codes:
        for val in field.get_subfields(code):
            parts.append(val.strip().rstrip(".").strip())
    return " ".join(parts) if parts else ""


def get_language(record):
    """Extract language from 041 or 008."""
    langs = []
    for f041 in record.get_fields("041"):
        for code in f041.get_subfields("a"):
            code = code.strip()
            langs.append(LANGUAGE_CODES.get(code, code))
    if not langs:
        try:
            f008 = get_field(record, "008")
            code = f008.data[35:38].strip() if f008 else ""
            if code and code != "|||":
                langs.append(LANGUAGE_CODES.get(code, code))
        except (AttributeError, IndexError):
            pass
    return "||".join(langs)


def extract_record(record):
    """Extract Dublin Core fields from a single MARC record."""
    row = {}

    # Use 245 $a $b for dc.title
    f245 = get_field(record, "245")
    if f245:
        title_parts = []
        for code in ("a", "b"):
            for v in f245.get_subfields(code):
                title_parts.append(v.strip().rstrip("/").strip())
        row["dc.title"] = " ".join(title_parts).rstrip(".").strip()
    else:
        row["dc.title"] = ""

    # Get OCLC number from 035 $a for dc.identifier
    identifiers = []
    for f035 in record.get_fields("035"):
        for v in f035.get_subfields("a"):
            identifiers.append(v.strip())
    # Also use the 001
    f001 = get_field(record, "001")
    if f001:
        identifiers.insert(0, f001.data.strip())
    row["dc.identifier"] = "||".join(identifiers)

    # Get dc.creator from 100 $a $d
    f100 = get_field(record, "100")
    if f100:
        creator = get_subfields(f100, ["a", "d"])
        row["dc.creator"] = creator.rstrip(",").strip()
    else:
        row["dc.creator"] = ""

    # Get dcterms.extent values from 300 $a $b $c
    f300 = get_field(record, "300")
    if f300:
        row["dcterms.extent"] = get_subfields(f300, ["a", "b", "c"])
    else:
        row["dcterms.extent"] = ""

    # dcterms.spatial use 651 $a (subject geographic) for values for dcterms.spatial
    spatials = []
    for f651 in record.get_fields("651"):
        place = f651.get_subfields("a")
        if place:
            spatials.append(place[0].strip().rstrip("."))
    row["dcterms.spatial"] = "||".join(spatials)

    # use 255 $c (coordinates/scale) and 034 for dc.coverage values and crosswalking to GeoJSON
    coverage_parts = []
    for f255 in record.get_fields("255"):
        for v in f255.get_subfields("a", "c"):
            coverage_parts.append(v.strip())
    row["dc.coverage"] = " ".join(coverage_parts)

    # Use 264 $b for dc.publisher
    f264 = get_field(record, "264")
    if f264:
        pub = f264.get_subfields("b")
        row["dc.publisher"] = pub[0].strip().rstrip(",").strip("[]") if pub else ""
    else:
        row["dc.publisher"] = ""

    # Use 264 $c for dcterms.issued
    if f264:
        date = f264.get_subfields("c")
        row["dcterms.issued"] = date[0].strip().rstrip(".").strip("[]") if date else ""
    else:
        row["dcterms.issued"] = ""

    # Use 336 $a (content type) for dcterms.type
    types336 = []
    for f336 in record.get_fields("336"):
        for v in f336.get_subfields("a"):
            types336.append(v.strip())
    row["dcterms.type"] = "||".join(types336)

    # Use 651 $a $v $x and 650 $a $x (full subject strings) for dc.subject values
    subjects = []
    for tag in ("600", "610", "650", "651"):
        for field in record.get_fields(tag):
            parts = []
            for sub in field.subfields:
                # pymarc subfields: list of (code, value) or alternating code/value
                if hasattr(sub, "code"):
                    if sub.code not in ("0", "1", "2", "4", "5", "6", "8"):
                        parts.append(sub.value.strip().rstrip("."))
                else:
                    pass
            if parts:
                subjects.append("--".join(parts))
    # If that fails, fallback
    if not subjects:
        for tag in ("600", "610", "650", "651"):
            for field in record.get_fields(tag):
                subfield_pairs = list(zip(field.subfields[::2], field.subfields[1::2])) if isinstance(field.subfields[0], str) else []
                parts = []
                for code, val in subfield_pairs:
                    if code not in ("0", "1", "2", "4", "5", "6", "8"):
                        parts.append(val.strip().rstrip("."))
                if parts:
                    subjects.append("--".join(parts))
    row["dc.subject"] = "||".join(subjects)

    # Use 655 $a for genre/form values
    genres = []
    for f655 in record.get_fields("655"):
        for v in f655.get_subfields("a"):
            genres.append(v.strip().rstrip("."))
    row["dc.type"] = "||".join(genres)

    # Get dc.language
    row["dc.language"] = get_language(record)

    # Include this although we will always get dc.rights from the curator
    f540 = get_field(record, "540")
    if f540:
        row["dc.rights"] = get_subfields(f540, ["a"])
    else:
        row["dc.rights"] = ""

    # Use 520 $a for dcterms.abstrat
    abstracts = []
    for f520 in record.get_fields("520"):
        for v in f520.get_subfields("a"):
            abstracts.append(v.strip())
    row["dcterms.abstract"] = "||".join(abstracts)

    # Use 500 $a (general notes), 546 $a (language note) for dc.description fields
    notes = []
    for f500 in record.get_fields("500"):
        for v in f500.get_subfields("a"):
            notes.append(v.strip())
    for f546 in record.get_fields("546"):
        for v in f546.get_subfields("a"):
            notes.append(v.strip())
    row["dc.description"] = "||".join(notes)

    # Use 700 with $t (related work title) or 773 for dcterms.isPartOf (Drop if it makes now sense)
    parts_of = []
    for f700 in record.get_fields("700"):
        titles = f700.get_subfields("t")
        if titles:
            creator_name = f700.get_subfields("a")
            for t in titles:
                entry = t.strip().rstrip(".")
                if creator_name:
                    entry = creator_name[0].strip().rstrip(",") + ". " + entry
                parts_of.append(entry)
    for f773 in record.get_fields("773"):
        for v in f773.get_subfields("t", "a"):
            parts_of.append(v.strip().rstrip("."))
    row["dcterms.isPartOf"] = "||".join(parts_of)

    # Use 338 $a (carrier type) for dc.format
    formats = []
    for f338 in record.get_fields("338"):
        for v in f338.get_subfields("a"):
            formats.append(v.strip())
    row["dc.format"] = "||".join(formats)

    return row


def main():
    with open(INPUT_FILE, "rb") as fh:
        reader = pymarc.MARCReader(fh)
        records = []
        for record in reader:
            if record is None:
                continue
            records.append(extract_record(record))

    with open(OUTPUT_FILE, "w", newline="", encoding="utf-8") as fh:
        writer = csv.DictWriter(fh, fieldnames=DC_HEADERS)
        writer.writeheader()
        writer.writerows(records)

    print(f"Wrote {len(records)} records to {OUTPUT_FILE}")


if __name__ == "__main__":
    main()